# Usually, there are two popular ways to create the RDDs: loading an external dataset, or distributing a set of collection of objects. The following examples show some simplest ways to create RDDs by using parallelize() fucntion which takes an already existing collection in your program and pass the same to the Spark Context.

### By using parallelize( ) function

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PYthon Spark Create RDD Ex").config('spark.some.config.option', 'some-value')\
.getOrCreate()

In [2]:
df = spark.sparkContext.parallelize([(1,2,3,'a b c'), (4,5,6,'d e f'), (7, 8, 9, 'g h i')]).toDF(['a', 'b', 'c', 'd'])
df.show()

+---+---+---+-----+
|  a|  b|  c|    d|
+---+---+---+-----+
|  1|  2|  3|a b c|
|  4|  5|  6|d e f|
|  7|  8|  9|g h i|
+---+---+---+-----+



In [3]:
myData = spark.sparkContext.parallelize([(1,2), (3,4), (5,6), (7,8), (9,10)])

myData.collect()

[(1, 2), (3, 4), (5, 6), (7, 8), (9, 10)]

### By using createDataFrame( ) function

In [4]:
Employee = spark.createDataFrame([
                        ('1', 'Joe',   '70000', '1'),
                        ('2', 'Henry', '80000', '2'),
                        ('3', 'Sam',   '60000', '2'),
                        ('4', 'Max',   '90000', '1')],
                        ['Id', 'Name', 'Sallary','DepartmentId']
)

Employee.show()

+---+-----+-------+------------+
| Id| Name|Sallary|DepartmentId|
+---+-----+-------+------------+
|  1|  Joe|  70000|           1|
|  2|Henry|  80000|           2|
|  3|  Sam|  60000|           2|
|  4|  Max|  90000|           1|
+---+-----+-------+------------+



### Read dataset from .csv file

In [5]:
df = spark.read.format('com.databricks.spark.csv').options(header = 'true', inferschema = 'true').\
load('Customer-Churn.csv', header = True)

In [6]:
df.show(5)
df.printSchema()

+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|customerID|gender|SeniorCitizen|Partner|Dependents|tenure|PhoneService|   MultipleLines|InternetService|OnlineSecurity|OnlineBackup|DeviceProtection|TechSupport|StreamingTV|StreamingMovies|      Contract|PaperlessBilling|       PaymentMethod|MonthlyCharges|TotalCharges|Churn|
+----------+------+-------------+-------+----------+------+------------+----------------+---------------+--------------+------------+----------------+-----------+-----------+---------------+--------------+----------------+--------------------+--------------+------------+-----+
|7590-VHVEG|Female|            0|    Yes|        No|     1|          No|No phone service|            DSL|            No|         Yes|              No|         No|    

### Read dataset from DataBase

In [7]:
# from pyspark.sql import SparkSession
# spark = SparkSession \
#             .builder \
#             .appName("Python Spark create RDD example") \
#             .config("spark.some.config.option", "some-value") \
#             .getOrCreate()
# ## User information
# user = 'your_username'
# pw   = 'your_password'
# ## Database information
# table_name = 'table_name'
# url = 'jdbc:postgresql://##.###.###.##:5432/dataset?user='+user+'&password='+pw
# properties ={'driver': 'org.postgresql.Driver', 'password': pw,'user
# ˓→': user}
# df = spark.read.jdbc(url=url, table=table_name,properties=properties)
# df.show(5)
# df.printSchema()

In [8]:
import pandas as pd
d = {'A': [0, 1, 0],
     'B': [1, 0, 1],
     'C': [1, 0, 0]}
pd.DataFrame(d)

Unnamed: 0,A,B,C
0,0,1,1
1,1,0,0
2,0,1,0


In [9]:
import numpy as np
a  = np.array(list(d.values()))
b = np.array(list(d.values())).T
c = np.array(list(d.values())).T.tolist()
print('aaaa',a)
print('bbbb',b)
print('cccc',c)
spark.createDataFrame(np.array(list(d.values())).T.tolist(), list(d.keys())).show()

aaaa [[0 1 0]
 [1 0 1]
 [1 0 0]]
bbbb [[0 1 1]
 [1 0 0]
 [0 1 0]]
cccc [[0, 1, 1], [1, 0, 0], [0, 1, 0]]
+---+---+---+
|  A|  B|  C|
+---+---+---+
|  0|  1|  1|
|  1|  0|  0|
|  0|  1|  0|
+---+---+---+



In [10]:
# pd.DataFrame dp: DataFrame pandas
dp = pd.read_csv('Advertising.csv') #rdd.DataFrame. dp: DataFrame spark
ds = spark.read.csv('Advertising.csv',
# # #
sep=',',
encoding='UTF-8',
comment=None,
header=True,
inferSchema=True)
print(dp)
ds.show()

     Unnamed: 0     TV  Radio  Newspaper  Sales
0             1  230.1   37.8       69.2   22.1
1             2   44.5   39.3       45.1   10.4
2             3   17.2   45.9       69.3    9.3
3             4  151.5   41.3       58.5   18.5
4             5  180.8   10.8       58.4   12.9
..          ...    ...    ...        ...    ...
195         196   38.2    3.7       13.8    7.6
196         197   94.2    4.9        8.1    9.7
197         198  177.0    9.3        6.4   12.8
198         199  283.6   42.0       66.2   25.5
199         200  232.1    8.6        8.7   13.4

[200 rows x 5 columns]
+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
|  6|  8.7| 48.9|     75.0|  7.2|
|  7| 57.5| 32.8|     23.5| 11.8|
|  8|120.2| 19.6|     11.6| 13.2|
|  9|  8.6|  2.1|      1.0

In [11]:
#dp = pd.read_json("http://api.luftdaten.info/static/v1/data.json")


In [12]:
#ds = spark.read.json('data.json')
#ds.show(5)

In [13]:
#dp[['id','timestamp']].head(4) # 
#ds[['id','timestamp']].show(4)

In [14]:
ds.columns

['_c0', 'TV', 'Radio', 'Newspaper', 'Sales']

In [15]:
my_list = [['male', 1, None], ['female', 2, 3],['male', 3, 4]] 
dp = pd.DataFrame(my_list,columns=['A', 'B', 'C'])
ds = spark.createDataFrame(my_list, ['A', 'B', 'C'])
#
dp.head()
ds.show()

+------+---+----+
|     A|  B|   C|
+------+---+----+
|  male|  1|null|
|female|  2|   3|
|  male|  3|   4|
+------+---+----+



In [16]:
ds.fillna(-99).show()

+------+---+---+
|     A|  B|  C|
+------+---+---+
|  male|  1|-99|
|female|  2|  3|
|  male|  3|  4|
+------+---+---+



In [17]:
# caution: you need to chose specific col
dp.A.replace(['male', 'female'],[1, 0], inplace=True) 
print(dp)
#caution: Mixed type replacements are not supported 
ds.na.replace(['male','female'],['1','0']).show()

   A  B    C
0  1  1  NaN
1  0  2  3.0
2  1  3  4.0
+---+---+----+
|  A|  B|   C|
+---+---+----+
|  1|  1|null|
|  0|  2|   3|
|  1|  3|   4|
+---+---+----+



In [18]:
dp = pd.read_csv('Advertising.csv') #rdd.DataFrame. dp: DataFrame pandas
ds = spark.read.csv('Advertising.csv',sep=',',
encoding='UTF-8',
comment=None,
header=True,
inferSchema=True)
mapping = {'Newspaper':'C','Sales':'D'}
dp.rename(columns=mapping).head(4)

Unnamed: 0.1,Unnamed: 0,TV,Radio,C,D
0,1,230.1,37.8,69.2,22.1
1,2,44.5,39.3,45.1,10.4
2,3,17.2,45.9,69.3,9.3
3,4,151.5,41.3,58.5,18.5


In [19]:
ds.show()
new_names = [mapping.get(col,col) for col in ds.columns] 
print(mapping.get(1,'Radio'))
print(new_names)
print(ds.columns)
ds.toDF(*new_names).show(4)

+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  1|230.1| 37.8|     69.2| 22.1|
|  2| 44.5| 39.3|     45.1| 10.4|
|  3| 17.2| 45.9|     69.3|  9.3|
|  4|151.5| 41.3|     58.5| 18.5|
|  5|180.8| 10.8|     58.4| 12.9|
|  6|  8.7| 48.9|     75.0|  7.2|
|  7| 57.5| 32.8|     23.5| 11.8|
|  8|120.2| 19.6|     11.6| 13.2|
|  9|  8.6|  2.1|      1.0|  4.8|
| 10|199.8|  2.6|     21.2| 10.6|
| 11| 66.1|  5.8|     24.2|  8.6|
| 12|214.7| 24.0|      4.0| 17.4|
| 13| 23.8| 35.1|     65.9|  9.2|
| 14| 97.5|  7.6|      7.2|  9.7|
| 15|204.1| 32.9|     46.0| 19.0|
| 16|195.4| 47.7|     52.9| 22.4|
| 17| 67.8| 36.6|    114.0| 12.5|
| 18|281.4| 39.6|     55.8| 24.4|
| 19| 69.2| 20.5|     18.3| 11.3|
| 20|147.3| 23.9|     19.1| 14.6|
+---+-----+-----+---------+-----+
only showing top 20 rows

Radio
['_c0', 'TV', 'Radio', 'C', 'D']
['_c0', 'TV', 'Radio', 'Newspaper', 'Sales']
+---+-----+-----+----+----+
|_c0|   TV|Radio|   C|   D|
+---+-----+-----+-

In [20]:
ds.withColumnRenamed('Newspaper','Paper').show(4)

+---+-----+-----+-----+-----+
|_c0|   TV|Radio|Paper|Sales|
+---+-----+-----+-----+-----+
|  1|230.1| 37.8| 69.2| 22.1|
|  2| 44.5| 39.3| 45.1| 10.4|
|  3| 17.2| 45.9| 69.3|  9.3|
|  4|151.5| 41.3| 58.5| 18.5|
+---+-----+-----+-----+-----+
only showing top 4 rows



In [21]:
ds.withColumnRenamed('Sales','derniere_sales').show(4)

+---+-----+-----+---------+--------------+
|_c0|   TV|Radio|Newspaper|derniere_sales|
+---+-----+-----+---------+--------------+
|  1|230.1| 37.8|     69.2|          22.1|
|  2| 44.5| 39.3|     45.1|          10.4|
|  3| 17.2| 45.9|     69.3|           9.3|
|  4|151.5| 41.3|     58.5|          18.5|
+---+-----+-----+---------+--------------+
only showing top 4 rows



## Drop Columns

In [22]:
drop_name = ['Newspaper','Sales']
dp.drop(drop_name,axis=1).head(4)

Unnamed: 0.1,Unnamed: 0,TV,Radio
0,1,230.1,37.8
1,2,44.5,39.3
2,3,17.2,45.9
3,4,151.5,41.3


In [23]:
ds.drop(*drop_name).show(4)

+---+-----+-----+
|_c0|   TV|Radio|
+---+-----+-----+
|  1|230.1| 37.8|
|  2| 44.5| 39.3|
|  3| 17.2| 45.9|
|  4|151.5| 41.3|
+---+-----+-----+
only showing top 4 rows



In [24]:
dp[(dp.Newspaper<20)&(dp.TV>100)].head(4)

Unnamed: 0.1,Unnamed: 0,TV,Radio,Newspaper,Sales
7,8,120.2,19.6,11.6,13.2
11,12,214.7,24.0,4.0,17.4
19,20,147.3,23.9,19.1,14.6
25,26,262.9,3.5,19.5,12.0


In [25]:
ds[(ds.Newspaper<20)&(ds.TV>100)].show(4)

+---+-----+-----+---------+-----+
|_c0|   TV|Radio|Newspaper|Sales|
+---+-----+-----+---------+-----+
|  8|120.2| 19.6|     11.6| 13.2|
| 12|214.7| 24.0|      4.0| 17.4|
| 20|147.3| 23.9|     19.1| 14.6|
| 26|262.9|  3.5|     19.5| 12.0|
+---+-----+-----+---------+-----+
only showing top 4 rows



In [26]:
dp['tv_norm'] = dp.TV/sum(dp.TV)
dp.head()

Unnamed: 0.1,Unnamed: 0,TV,Radio,Newspaper,Sales,tv_norm
0,1,230.1,37.8,69.2,22.1,0.007824
1,2,44.5,39.3,45.1,10.4,0.001513
2,3,17.2,45.9,69.3,9.3,0.000585
3,4,151.5,41.3,58.5,18.5,0.005152
4,5,180.8,10.8,58.4,12.9,0.006148


In [27]:
import pyspark.sql.functions as F
ds.groupBy().agg(F.sum("TV")).collect()[0][0]

29408.499999999996

In [28]:
ds.withColumn('tv_norm', ds.TV/ds.groupBy().agg(F.sum("TV")).collect()[0][0]).show(4)

+---+-----+-----+---------+-----+--------------------+
|_c0|   TV|Radio|Newspaper|Sales|             tv_norm|
+---+-----+-----+---------+-----+--------------------+
|  1|230.1| 37.8|     69.2| 22.1|0.007824268493802813|
|  2| 44.5| 39.3|     45.1| 10.4|0.001513167961643...|
|  3| 17.2| 45.9|     69.3|  9.3|5.848649200061207E-4|
|  4|151.5| 41.3|     58.5| 18.5|0.005151571824472517|
+---+-----+-----+---------+-----+--------------------+
only showing top 4 rows



In [29]:
dp['log_tv'] = np.log(dp.TV)
print(dp.head(4))
#
import pyspark.sql.functions as F 
ds.withColumn('log_tv',F.log(ds.TV)).show(4)

   Unnamed: 0     TV  Radio  Newspaper  Sales   tv_norm    log_tv
0           1  230.1   37.8       69.2   22.1  0.007824  5.438514
1           2   44.5   39.3       45.1   10.4  0.001513  3.795489
2           3   17.2   45.9       69.3    9.3  0.000585  2.844909
3           4  151.5   41.3       58.5   18.5  0.005152  5.020586
+---+-----+-----+---------+-----+------------------+
|_c0|   TV|Radio|Newspaper|Sales|            log_tv|
+---+-----+-----+---------+-----+------------------+
|  1|230.1| 37.8|     69.2| 22.1|  5.43851399704132|
|  2| 44.5| 39.3|     45.1| 10.4|3.7954891891721947|
|  3| 17.2| 45.9|     69.3|  9.3|2.8449093838194073|
|  4|151.5| 41.3|     58.5| 18.5| 5.020585624949424|
+---+-----+-----+---------+-----+------------------+
only showing top 4 rows



In [30]:
import pyspark.sql.functions as F 
ds.withColumn('ekstra',ds.TV/ds.groupBy().agg(F.sum('TV')).collect()[0][0]).show(5)

+---+-----+-----+---------+-----+--------------------+
|_c0|   TV|Radio|Newspaper|Sales|              ekstra|
+---+-----+-----+---------+-----+--------------------+
|  1|230.1| 37.8|     69.2| 22.1|0.007824268493802813|
|  2| 44.5| 39.3|     45.1| 10.4|0.001513167961643...|
|  3| 17.2| 45.9|     69.3|  9.3|5.848649200061207E-4|
|  4|151.5| 41.3|     58.5| 18.5|0.005151571824472517|
|  5|180.8| 10.8|     58.4| 12.9|0.006147882414948061|
+---+-----+-----+---------+-----+--------------------+
only showing top 5 rows



In [31]:
dp['tv+10'] = dp.TV.apply(lambda x: x+10) 
print(dp.head(4))
#
ds.withColumn('tv+10', ds.TV+10).show(4)
ds

   Unnamed: 0     TV  Radio  Newspaper  Sales   tv_norm    log_tv  tv+10
0           1  230.1   37.8       69.2   22.1  0.007824  5.438514  240.1
1           2   44.5   39.3       45.1   10.4  0.001513  3.795489   54.5
2           3   17.2   45.9       69.3    9.3  0.000585  2.844909   27.2
3           4  151.5   41.3       58.5   18.5  0.005152  5.020586  161.5
+---+-----+-----+---------+-----+-----+
|_c0|   TV|Radio|Newspaper|Sales|tv+10|
+---+-----+-----+---------+-----+-----+
|  1|230.1| 37.8|     69.2| 22.1|240.1|
|  2| 44.5| 39.3|     45.1| 10.4| 54.5|
|  3| 17.2| 45.9|     69.3|  9.3| 27.2|
|  4|151.5| 41.3|     58.5| 18.5|161.5|
+---+-----+-----+---------+-----+-----+
only showing top 4 rows



DataFrame[_c0: int, TV: double, Radio: double, Newspaper: double, Sales: double]

### JOIN

In [32]:
leftp = pd.DataFrame({'A': ['A0', 'A1', 'A2', 'A3'],
                    'B': ['B0', 'B1', 'B2', 'B3'],
                    'C': ['C0', 'C1', 'C2', 'C3'],
                    'D': ['D0', 'D1', 'D2', 'D3']},
                    index=[0, 1, 2, 3])
print(leftp)
rightp = pd.DataFrame({'A': ['A0', 'A1', 'A6', 'A7'],
                       'F': ['B4', 'B5', 'B6', 'B7'],
                       'G': ['C4', 'C5', 'C6', 'C7'],
                       'H': ['D4', 'D5', 'D6', 'D7']},
                       index=[4, 5, 6, 7])
print(rightp)
lefts = spark.createDataFrame(leftp)
rights = spark.createDataFrame(rightp)

    A   B   C   D
0  A0  B0  C0  D0
1  A1  B1  C1  D1
2  A2  B2  C2  D2
3  A3  B3  C3  D3
    A   F   G   H
4  A0  B4  C4  D4
5  A1  B5  C5  D5
6  A6  B6  C6  D6
7  A7  B7  C7  D7


In [33]:
## Left Join
leftp.merge(rightp,on='A',how='left') # 


Unnamed: 0,A,B,C,D,F,G,H
0,A0,B0,C0,D0,B4,C4,D4
1,A1,B1,C1,D1,B5,C5,D5
2,A2,B2,C2,D2,,,
3,A3,B3,C3,D3,,,


In [34]:
lefts.join(rights,on='A',how='left').orderBy('A',ascending=True).show()

+---+---+---+---+----+----+----+
|  A|  B|  C|  D|   F|   G|   H|
+---+---+---+---+----+----+----+
| A0| B0| C0| D0|  B4|  C4|  D4|
| A1| B1| C1| D1|  B5|  C5|  D5|
| A2| B2| C2| D2|null|null|null|
| A3| B3| C3| D3|null|null|null|
+---+---+---+---+----+----+----+



In [35]:
## Right Join

In [36]:
leftp.merge(rightp, on = 'A', how = 'right')

Unnamed: 0,A,B,C,D,F,G,H
0,A0,B0,C0,D0,B4,C4,D4
1,A1,B1,C1,D1,B5,C5,D5
2,A6,,,,B6,C6,D6
3,A7,,,,B7,C7,D7


In [37]:
lefts.join(rights,on='A',how='right').orderBy('A',ascending=True).show()

+---+----+----+----+---+---+---+
|  A|   B|   C|   D|  F|  G|  H|
+---+----+----+----+---+---+---+
| A0|  B0|  C0|  D0| B4| C4| D4|
| A1|  B1|  C1|  D1| B5| C5| D5|
| A6|null|null|null| B6| C6| D6|
| A7|null|null|null| B7| C7| D7|
+---+----+----+----+---+---+---+



In [38]:
leftp.merge(rightp,on='A',how='outer') #
print(leftp)
lefts.join(rights,on='A',how='full').orderBy('A',ascending=True).show()

    A   B   C   D
0  A0  B0  C0  D0
1  A1  B1  C1  D1
2  A2  B2  C2  D2
3  A3  B3  C3  D3
+---+----+----+----+----+----+----+
|  A|   B|   C|   D|   F|   G|   H|
+---+----+----+----+----+----+----+
| A0|  B0|  C0|  D0|  B4|  C4|  D4|
| A1|  B1|  C1|  D1|  B5|  C5|  D5|
| A2|  B2|  C2|  D2|null|null|null|
| A3|  B3|  C3|  D3|null|null|null|
| A6|null|null|null|  B6|  C6|  D6|
| A7|null|null|null|  B7|  C7|  D7|
+---+----+----+----+----+----+----+



### Concat Columns

In [39]:
my_list = [('a', 2, 3),
           ('b', 5, 6),
           ('c', 8, 9),
           ('a', 2, 3),
           ('b', 5, 6),
           ('c', 8, 9)]
col_name = ['col1', 'col2', 'col3']
#
dp = pd.DataFrame(my_list,columns=col_name)
print(dp)
ds = spark.createDataFrame(my_list,schema=col_name)
ds

  col1  col2  col3
0    a     2     3
1    b     5     6
2    c     8     9
3    a     2     3
4    b     5     6
5    c     8     9


DataFrame[col1: string, col2: bigint, col3: bigint]

In [40]:
dp['concat'] = dp.apply(lambda x:'%s%s'%(x['col1'],x['col2']),axis=1)
#
print(dp)
ds.withColumn('concat',F.concat('col1','col2')).show()

  col1  col2  col3 concat
0    a     2     3     a2
1    b     5     6     b5
2    c     8     9     c8
3    a     2     3     a2
4    b     5     6     b5
5    c     8     9     c8
+----+----+----+------+
|col1|col2|col3|concat|
+----+----+----+------+
|   a|   2|   3|    a2|
|   b|   5|   6|    b5|
|   c|   8|   9|    c8|
|   a|   2|   3|    a2|
|   b|   5|   6|    b5|
|   c|   8|   9|    c8|
+----+----+----+------+



### GroupBy

In [41]:
dp = dp.groupby(['col1']).agg({'col2':'min','col3':'mean'})
print(dp)
#
ds.groupBy(['col1']).agg({'col2': 'min', 'col3': 'avg'}).show()

      col2  col3
col1            
a        2     3
b        5     6
c        8     9
+----+---------+---------+
|col1|min(col2)|avg(col3)|
+----+---------+---------+
|   c|        8|      9.0|
|   b|        5|      6.0|
|   a|        2|      3.0|
+----+---------+---------+



### Pivot

In [42]:
pd2 = pd.pivot_table(dp, values='col3', index='col1', columns='col2', aggfunc=np.sum)
print(pd2)
#
ds.groupBy(['col1']).pivot('col2').sum('col3').show()

col2    2    5    8
col1               
a     3.0  NaN  NaN
b     NaN  6.0  NaN
c     NaN  NaN  9.0
+----+----+----+----+
|col1|   2|   5|   8|
+----+----+----+----+
|   c|null|null|  18|
|   b|null|  12|null|
|   a|   6|null|null|
+----+----+----+----+



### Window

In [50]:
d = {'A':['a','b','c','d'],'B':['m','m','n','n'],'C':[1,2,3,6]}
dp = pd.DataFrame(d)
ds = spark.createDataFrame(dp)

In [51]:
dp

Unnamed: 0,A,B,C
0,a,m,1
1,b,m,2
2,c,n,3
3,d,n,6


In [53]:
dp['rank'] = dp.groupby('B')['C'].rank('dense',ascending=False) #
print(dp)
from pyspark.sql.window import Window
w = Window.partitionBy('B').orderBy(ds.C.desc())
ds = ds.withColumn('rank',F.rank().over(w)).show()

   A  B  C  rank
0  a  m  1   2.0
1  b  m  2   1.0
2  c  n  3   2.0
3  d  n  6   1.0
+---+---+---+----+
|  A|  B|  C|rank|
+---+---+---+----+
|  b|  m|  2|   1|
|  a|  m|  1|   2|
|  d|  n|  6|   1|
|  c|  n|  3|   2|
+---+---+---+----+

