In [1]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 65kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 44.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=0f265ff1390290852cd51b6e1e60eb158b910ac760cfc7876ab21f6f78e777e6
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [1]:
# create entry points to spark
try:
    sc.stop()
except:
    pass
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
sc = SparkContext()
spark = SparkSession(sparkContext=sc)

In [3]:
# rdd are created from python collection types

In [4]:
# from a list
rdd = sc.parallelize([1,2,3])
rdd.collect()

[1, 2, 3]

In [5]:
# from a tuple
rdd = sc.parallelize(('cat', 'dog', 'fish'))
rdd.collect()

['cat', 'dog', 'fish']

In [6]:
# from a list of tuple
list_t = [('cat', 'dog', 'fish'), ('orange', 'apple')]
rdd = sc.parallelize(list_t)
rdd.collect()

[('cat', 'dog', 'fish'), ('orange', 'apple')]

In [7]:
# from a set
s = {'cat', 'dog', 'fish', 'cat', 'dog', 'dog'}
rdd = sc.parallelize(s)
rdd.collect()

['dog', 'fish', 'cat']

In [8]:
# from a dict
d = {
    'a': 100,
    'b': 200,
    'c': 300
}
rdd = sc.parallelize(d)
rdd.collect()

['a', 'b', 'c']

In [16]:
# textFile 
# read a csv file
rdd = sc.textFile('/content/drive/MyDrive/mtcars.csv')
rdd.take(5)

[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1',
 'Hornet 4 Drive,21.4,6,258,110,3.08,3.215,19.44,1,0,3,1']

In [17]:
mtcars = spark.read.csv(path='/content/drive/MyDrive/mtcars.csv',
                        sep=',',
                        encoding='UTF-8',
                        comment=None,
                        header=True, 
                        inferSchema=True)
mtcars.show(n=5, truncate=False)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|_c0              |mpg |cyl|disp |hp |drat|wt   |qsec |vs |am |gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|Mazda RX4        |21.0|6  |160.0|110|3.9 |2.62 |16.46|0  |1  |4   |4   |
|Mazda RX4 Wag    |21.0|6  |160.0|110|3.9 |2.875|17.02|0  |1  |4   |4   |
|Datsun 710       |22.8|4  |108.0|93 |3.85|2.32 |18.61|1  |1  |4   |1   |
|Hornet 4 Drive   |21.4|6  |258.0|110|3.08|3.215|19.44|1  |0  |3   |1   |
|Hornet Sportabout|18.7|8  |360.0|175|3.15|3.44 |17.02|0  |0  |3   |2   |
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [18]:
# columns must be defined as rows

from pyspark.sql import Row
rdd = sc.parallelize([
    Row(x=[1,2,3], y=['a','b','c']),
    Row(x=[4,5,6], y=['e','f','g'])
])
rdd.collect()

[Row(x=[1, 2, 3], y=['a', 'b', 'c']), Row(x=[4, 5, 6], y=['e', 'f', 'g'])]

In [19]:
df = spark.createDataFrame(rdd)
df.show()

+---------+---------+
|        x|        y|
+---------+---------+
|[1, 2, 3]|[a, b, c]|
|[4, 5, 6]|[e, f, g]|
+---------+---------+



In [20]:
import pandas as pd
pdf = pd.DataFrame({
    'x': [[1,2,3], [4,5,6]],
    'y': [['a','b','c'], ['e','f','g']]
})
pdf

Unnamed: 0,x,y
0,"[1, 2, 3]","[a, b, c]"
1,"[4, 5, 6]","[e, f, g]"


In [21]:
df = spark.createDataFrame(pdf)
df.show()

+---------+---------+
|        x|        y|
+---------+---------+
|[1, 2, 3]|[a, b, c]|
|[4, 5, 6]|[e, f, g]|
+---------+---------+



In [22]:
my_list = [['a', 1], ['b', 2]]
df = spark.createDataFrame(my_list, ['letter', 'number'])
df.show()

+------+------+
|letter|number|
+------+------+
|     a|     1|
|     b|     2|
+------+------+



In [23]:
df.dtypes

[('letter', 'string'), ('number', 'bigint')]

In [24]:
my_list = [['a', 1], ['b', 2]]
df = spark.createDataFrame(my_list, ['my_column'])
df.show()

+---------+---+
|my_column| _2|
+---------+---+
|        a|  1|
|        b|  2|
+---------+---+



In [25]:
df.dtypes

[('my_column', 'string'), ('_2', 'bigint')]

In [26]:
my_list = [(['a', 1], ['b', 2])]
df = spark.createDataFrame(my_list, ['x', 'y'])
df.show()

+------+------+
|     x|     y|
+------+------+
|[a, 1]|[b, 2]|
+------+------+



In [2]:
# rdd2df

In [3]:
mtcars = spark.read.csv(path='/content/drive/MyDrive/mtcars.csv',
                        sep=',',
                        encoding='UTF-8',
                        comment=None,
                        header=True, 
                        inferSchema=True)

In [4]:
mtcars.rdd.take(2)

[Row(_c0='Mazda RX4', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.62, qsec=16.46, vs=0, am=1, gear=4, carb=4),
 Row(_c0='Mazda RX4 Wag', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.875, qsec=17.02, vs=0, am=1, gear=4, carb=4)]

In [5]:
mtcars_map = mtcars.rdd.map(lambda x: (x['_c0'], x['mpg']))
mtcars_map.take(5)

[('Mazda RX4', 21.0),
 ('Mazda RX4 Wag', 21.0),
 ('Datsun 710', 22.8),
 ('Hornet 4 Drive', 21.4),
 ('Hornet Sportabout', 18.7)]

In [6]:
mtcars_mapvalues = mtcars_map.mapValues(lambda x: [x, x * 10])
mtcars_mapvalues.take(5)

[('Mazda RX4', [21.0, 210.0]),
 ('Mazda RX4 Wag', [21.0, 210.0]),
 ('Datsun 710', [22.8, 228.0]),
 ('Hornet 4 Drive', [21.4, 214.0]),
 ('Hornet Sportabout', [18.7, 187.0])]

In [None]:
# other methods
# map, mapValues, flatMap, flatMapValues 

In [7]:
rdd_raw = sc.textFile('/content/drive/MyDrive/mtcars.csv')
rdd_raw.take(5)

[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1',
 'Hornet 4 Drive,21.4,6,258,110,3.08,3.215,19.44,1,0,3,1']

In [8]:
header = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] == 'mpg').collect()[0]
print(header)
header[0] = 'model'
header

['', 'mpg', 'cyl', 'disp', 'hp', 'drat', 'wt', 'qsec', 'vs', 'am', 'gear', 'carb']


['model',
 'mpg',
 'cyl',
 'disp',
 'hp',
 'drat',
 'wt',
 'qsec',
 'vs',
 'am',
 'gear',
 'carb']

In [9]:
rdd = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] != 'mpg')
rdd.take(2)

[['Mazda RX4',
  '21',
  '6',
  '160',
  '110',
  '3.9',
  '2.62',
  '16.46',
  '0',
  '1',
  '4',
  '4'],
 ['Mazda RX4 Wag',
  '21',
  '6',
  '160',
  '110',
  '3.9',
  '2.875',
  '17.02',
  '0',
  '1',
  '4',
  '4']]

In [10]:
from pyspark.sql import Row
my_dict = dict(zip(['a', 'b', 'c'], range(1, 4)))
Row(**my_dict)

Row(a=1, b=2, c=3)

In [17]:
def list_to_row(keys, values):
    row_dict = dict(zip(keys, values))
    return Row(**row_dict)

In [21]:
print(header)
print(rdd.collect()[0])

['model', 'mpg', 'cyl', 'disp', 'hp', 'drat', 'wt', 'qsec', 'vs', 'am', 'gear', 'carb']
['Mazda RX4', '21', '6', '160', '110', '3.9', '2.62', '16.46', '0', '1', '4', '4']


In [19]:
rdd_rows = rdd.map(lambda x: list_to_row(header, x))
rdd_rows.take(3)

[Row(model='Mazda RX4', mpg='21', cyl='6', disp='160', hp='110', drat='3.9', wt='2.62', qsec='16.46', vs='0', am='1', gear='4', carb='4'),
 Row(model='Mazda RX4 Wag', mpg='21', cyl='6', disp='160', hp='110', drat='3.9', wt='2.875', qsec='17.02', vs='0', am='1', gear='4', carb='4'),
 Row(model='Datsun 710', mpg='22.8', cyl='4', disp='108', hp='93', drat='3.85', wt='2.32', qsec='18.61', vs='1', am='1', gear='4', carb='1')]

In [22]:
df = spark.createDataFrame(rdd_rows)
df.show(5)

+-----------------+----+---+----+---+----+-----+-----+---+---+----+----+
|            model| mpg|cyl|disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|  21|  6| 160|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|  21|  6| 160|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4| 108| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6| 258|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8| 360|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [23]:
mtcars = spark.read.csv(path='/content/drive/MyDrive/mtcars.csv',
                        sep=',',
                        encoding='UTF-8',
                        comment=None,
                        header=True, 
                        inferSchema=True)

In [24]:
mtcars.show(n=5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|              _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [25]:
# adjust first column name
colnames = mtcars.columns
colnames[0] = 'model'
mtcars = mtcars.rdd.toDF(colnames)
mtcars.show(5)

+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|
|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|
|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|
|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|
|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|
+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+
only showing top 5 rows



In [26]:
from pyspark.sql import Row
mtcars_rdd = mtcars.rdd.map(lambda x: Row(model=x[0], values=x[1:]))
mtcars_rdd.take(5)

[Row(model='Mazda RX4', values=(21.0, 6, 160.0, 110, 3.9, 2.62, 16.46, 0, 1, 4, 4)),
 Row(model='Mazda RX4 Wag', values=(21.0, 6, 160.0, 110, 3.9, 2.875, 17.02, 0, 1, 4, 4)),
 Row(model='Datsun 710', values=(22.8, 4, 108.0, 93, 3.85, 2.32, 18.61, 1, 1, 4, 1)),
 Row(model='Hornet 4 Drive', values=(21.4, 6, 258.0, 110, 3.08, 3.215, 19.44, 1, 0, 3, 1)),
 Row(model='Hornet Sportabout', values=(18.7, 8, 360.0, 175, 3.15, 3.44, 17.02, 0, 0, 3, 2))]

In [27]:
mtcars_df = spark.createDataFrame(mtcars_rdd)
mtcars_df.show(5, truncate=False)

+-----------------+-----------------------------------------------------+
|model            |values                                               |
+-----------------+-----------------------------------------------------+
|Mazda RX4        |[21.0, 6, 160.0, 110, 3.9, 2.62, 16.46, 0, 1, 4, 4]  |
|Mazda RX4 Wag    |[21.0, 6, 160.0, 110, 3.9, 2.875, 17.02, 0, 1, 4, 4] |
|Datsun 710       |[22.8, 4, 108.0, 93, 3.85, 2.32, 18.61, 1, 1, 4, 1]  |
|Hornet 4 Drive   |[21.4, 6, 258.0, 110, 3.08, 3.215, 19.44, 1, 0, 3, 1]|
|Hornet Sportabout|[18.7, 8, 360.0, 175, 3.15, 3.44, 17.02, 0, 0, 3, 2] |
+-----------------+-----------------------------------------------------+
only showing top 5 rows



In [28]:
mtcars_rdd_2 = mtcars_df.rdd.map(lambda x: Row(model=x[0], x1=x[1][:5], x2=x[1][5:]))
# convert RDD back to DataFrame
mtcars_df_2 = spark.createDataFrame(mtcars_rdd_2)
mtcars_df_2.show(5, truncate=False)

+-----------------+---------------------------+--------------------------+
|model            |x1                         |x2                        |
+-----------------+---------------------------+--------------------------+
|Mazda RX4        |[21.0, 6, 160.0, 110, 3.9] |[2.62, 16.46, 0, 1, 4, 4] |
|Mazda RX4 Wag    |[21.0, 6, 160.0, 110, 3.9] |[2.875, 17.02, 0, 1, 4, 4]|
|Datsun 710       |[22.8, 4, 108.0, 93, 3.85] |[2.32, 18.61, 1, 1, 4, 1] |
|Hornet 4 Drive   |[21.4, 6, 258.0, 110, 3.08]|[3.215, 19.44, 1, 0, 3, 1]|
|Hornet Sportabout|[18.7, 8, 360.0, 175, 3.15]|[3.44, 17.02, 0, 0, 3, 2] |
+-----------------+---------------------------+--------------------------+
only showing top 5 rows

