## Create SparkContext & SparkSession 

**SparkContext**

In [1]:
from pyspark import SparkContext
sc = SparkContext(master = 'local')

**SparkSession**

In [3]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
          .appName("Python Spark SQL basic example") \
          .config("spark.some.config.option", "some-value") \
          .getOrCreate()

In [5]:
mtcars = spark.read.csv(path='data/mtcars.csv',
                        sep=',',
                        encoding='UTF-8',
                        comment=None,
                        header=True, 
                        inferSchema=True)

In [17]:
schema = ArrayType(StructType([
                StructField('f1', StringType()),
                StructField('f2', StringType())
            ]))

In [26]:
mtcars.rdd.take(2)

[Row(_c0='Mazda RX4', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.62, qsec=16.46, vs=0, am=1, gear=4, carb=4),
 Row(_c0='Mazda RX4 Wag', mpg=21.0, cyl=6, disp=160.0, hp=110, drat=3.9, wt=2.875, qsec=17.02, vs=0, am=1, gear=4, carb=4)]

In [29]:
mtcars_map = mtcars.rdd.map(lambda x: (x['_c0'], x['mpg']))
mtcars_map.take(5)

[('Mazda RX4', 21.0),
 ('Mazda RX4 Wag', 21.0),
 ('Datsun 710', 22.8),
 ('Hornet 4 Drive', 21.4),
 ('Hornet Sportabout', 18.7)]

In [32]:
mtcars_mapvalues = mtcars_map.mapValues(lambda x: [x, x * 10])
mtcars_mapvalues.take(5)

[('Mazda RX4', [21.0, 210.0]),
 ('Mazda RX4 Wag', [21.0, 210.0]),
 ('Datsun 710', [22.8, 228.0]),
 ('Hornet 4 Drive', [21.4, 214.0]),
 ('Hornet Sportabout', [18.7, 187.0])]

In [35]:
from pyspark.sql import Row
from pyspark import RDD

In [36]:
dir(sc)

['PACKAGE_EXTENSIONS',
 '__class__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__enter__',
 '__eq__',
 '__exit__',
 '__format__',
 '__ge__',
 '__getattribute__',
 '__getnewargs__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_accumulatorServer',
 '_active_spark_context',
 '_batchSize',
 '_callsite',
 '_checkpointFile',
 '_conf',
 '_dictToJavaMap',
 '_do_init',
 '_ensure_initialized',
 '_gateway',
 '_getJavaStorageLevel',
 '_initialize_context',
 '_javaAccumulator',
 '_jsc',
 '_jvm',
 '_lock',
 '_next_accum_id',
 '_pickled_broadcast_vars',
 '_python_includes',
 '_temp_dir',
 '_unbatched_serializer',
 'accumulator',
 'addFile',
 'addPyFile',
 'appName',
 'applicationId',
 'binaryFiles',
 'binaryRecords',
 'broadcast',
 'cancelAllJobs',
 'cancelJobGroup',
 'defaultMinPartition

In [39]:
sc.parallelize([
    Row(x1='a', x2=3),
    Row(x1='b', x2=4)
]).collect()

[Row(x1='a', x2=3), Row(x1='b', x2=4)]

In [40]:
rdd = sc.parallelize([
    Row(x1='a', x2=3),
    Row(x1='b', x2=4),
    Row(x1='c', x2=5),
])
rdd.collect()

[Row(x1='a', x2=3), Row(x1='b', x2=4), Row(x1='c', x2=5)]

In [34]:
from pyspark.sql.types import *
spark.createDataFrame(mtcars.rdd.map(lambda x: [x['_c0'], x['mpg']]))

DataFrame[_1: string, _2: double]

In [87]:
rdd_raw = sc.textFile('data/mtcars.csv')
rdd_raw.take(5)

[',mpg,cyl,disp,hp,drat,wt,qsec,vs,am,gear,carb',
 'Mazda RX4,21,6,160,110,3.9,2.62,16.46,0,1,4,4',
 'Mazda RX4 Wag,21,6,160,110,3.9,2.875,17.02,0,1,4,4',
 'Datsun 710,22.8,4,108,93,3.85,2.32,18.61,1,1,4,1',
 'Hornet 4 Drive,21.4,6,258,110,3.08,3.215,19.44,1,0,3,1']

In [88]:
header = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] == 'mpg').collect()[0]
header[0] = 'model'
header

['model',
 'mpg',
 'cyl',
 'disp',
 'hp',
 'drat',
 'wt',
 'qsec',
 'vs',
 'am',
 'gear',
 'carb']

In [90]:
rdd = rdd_raw.map(lambda x: x.split(',')).filter(lambda x: x[1] != 'mpg')
rdd.take(2)

[['Mazda RX4',
  '21',
  '6',
  '160',
  '110',
  '3.9',
  '2.62',
  '16.46',
  '0',
  '1',
  '4',
  '4'],
 ['Mazda RX4 Wag',
  '21',
  '6',
  '160',
  '110',
  '3.9',
  '2.875',
  '17.02',
  '0',
  '1',
  '4',
  '4']]

In [112]:
len(header)
my_dict = dict(zip(['a', 'b', 'c'], range(1,4)))
Row(**my_dict)

Row(a=1, b=2, c=3)

In [118]:

Row(**row_dict)

Row(am=10, carb=12, cyl=3, disp=4, drat=6, gear=11, hp=5, model=1, mpg=2, qsec=8, vs=9, wt=7)

In [113]:
rdd_rows = rdd.map(lambda x: list_to_row(header, x))
rdd_rows.take(3)

[Row(am='1', carb='4', cyl='6', disp='160', drat='3.9', gear='4', hp='110', model='Mazda RX4', mpg='21', qsec='16.46', vs='0', wt='2.62'),
 Row(am='1', carb='4', cyl='6', disp='160', drat='3.9', gear='4', hp='110', model='Mazda RX4 Wag', mpg='21', qsec='17.02', vs='0', wt='2.875'),
 Row(am='1', carb='1', cyl='4', disp='108', drat='3.85', gear='4', hp='93', model='Datsun 710', mpg='22.8', qsec='18.61', vs='1', wt='2.32')]

In [105]:
def list_to_row(keys, values):
    row_dict = dict(zip(keys, values))
    return Row(**row_dict)

In [114]:
df = spark.createDataFrame(rdd_rows)
df.show(5)

+---+----+---+----+----+----+---+-----------------+----+-----+---+-----+
| am|carb|cyl|disp|drat|gear| hp|            model| mpg| qsec| vs|   wt|
+---+----+---+----+----+----+---+-----------------+----+-----+---+-----+
|  1|   4|  6| 160| 3.9|   4|110|        Mazda RX4|  21|16.46|  0| 2.62|
|  1|   4|  6| 160| 3.9|   4|110|    Mazda RX4 Wag|  21|17.02|  0|2.875|
|  1|   1|  4| 108|3.85|   4| 93|       Datsun 710|22.8|18.61|  1| 2.32|
|  0|   1|  6| 258|3.08|   3|110|   Hornet 4 Drive|21.4|19.44|  1|3.215|
|  0|   2|  8| 360|3.15|   3|175|Hornet Sportabout|18.7|17.02|  0| 3.44|
+---+----+---+----+----+----+---+-----------------+----+-----+---+-----+
only showing top 5 rows



In [127]:
rdd_merged = df.rdd.map(lambda x: Row(model=x[7], values=list(map(float, x[:7] + x[8:]))))
rdd_merged.take(4)

[Row(model='Mazda RX4', values=[1.0, 4.0, 6.0, 160.0, 3.9, 4.0, 110.0, 21.0, 16.46, 0.0, 2.62]),
 Row(model='Mazda RX4 Wag', values=[1.0, 4.0, 6.0, 160.0, 3.9, 4.0, 110.0, 21.0, 17.02, 0.0, 2.875]),
 Row(model='Datsun 710', values=[1.0, 1.0, 4.0, 108.0, 3.85, 4.0, 93.0, 22.8, 18.61, 1.0, 2.32]),
 Row(model='Hornet 4 Drive', values=[0.0, 1.0, 6.0, 258.0, 3.08, 3.0, 110.0, 21.4, 19.44, 1.0, 3.215])]

In [125]:
list(map(float, ['1'] + ['2']))

[1.0, 2.0]

In [129]:
df_merged = spark.createDataFrame(rdd_merged)
df_merged.show(5, truncate=False)

+-----------------+-----------------------------------------------------------------+
|model            |values                                                           |
+-----------------+-----------------------------------------------------------------+
|Mazda RX4        |[1.0, 4.0, 6.0, 160.0, 3.9, 4.0, 110.0, 21.0, 16.46, 0.0, 2.62]  |
|Mazda RX4 Wag    |[1.0, 4.0, 6.0, 160.0, 3.9, 4.0, 110.0, 21.0, 17.02, 0.0, 2.875] |
|Datsun 710       |[1.0, 1.0, 4.0, 108.0, 3.85, 4.0, 93.0, 22.8, 18.61, 1.0, 2.32]  |
|Hornet 4 Drive   |[0.0, 1.0, 6.0, 258.0, 3.08, 3.0, 110.0, 21.4, 19.44, 1.0, 3.215]|
|Hornet Sportabout|[0.0, 2.0, 8.0, 360.0, 3.15, 3.0, 175.0, 18.7, 17.02, 0.0, 3.44] |
+-----------------+-----------------------------------------------------------------+
only showing top 5 rows



In [133]:
df_3_columns = df_merged.rdd.map(lambda x: Row(model=x[0], x1=x[1][:4], x2=x[1][4:]))
df_3_columns.take(5)

[Row(model='Mazda RX4', x1=[1.0, 4.0, 6.0, 160.0], x2=[3.9, 4.0, 110.0, 21.0, 16.46, 0.0, 2.62]),
 Row(model='Mazda RX4 Wag', x1=[1.0, 4.0, 6.0, 160.0], x2=[3.9, 4.0, 110.0, 21.0, 17.02, 0.0, 2.875]),
 Row(model='Datsun 710', x1=[1.0, 1.0, 4.0, 108.0], x2=[3.85, 4.0, 93.0, 22.8, 18.61, 1.0, 2.32]),
 Row(model='Hornet 4 Drive', x1=[0.0, 1.0, 6.0, 258.0], x2=[3.08, 3.0, 110.0, 21.4, 19.44, 1.0, 3.215]),
 Row(model='Hornet Sportabout', x1=[0.0, 2.0, 8.0, 360.0], x2=[3.15, 3.0, 175.0, 18.7, 17.02, 0.0, 3.44])]

In [134]:
spark.createDataFrame(df_3_columns).show(5, truncate=False)

+-----------------+----------------------+-------------------------------------------+
|model            |x1                    |x2                                         |
+-----------------+----------------------+-------------------------------------------+
|Mazda RX4        |[1.0, 4.0, 6.0, 160.0]|[3.9, 4.0, 110.0, 21.0, 16.46, 0.0, 2.62]  |
|Mazda RX4 Wag    |[1.0, 4.0, 6.0, 160.0]|[3.9, 4.0, 110.0, 21.0, 17.02, 0.0, 2.875] |
|Datsun 710       |[1.0, 1.0, 4.0, 108.0]|[3.85, 4.0, 93.0, 22.8, 18.61, 1.0, 2.32]  |
|Hornet 4 Drive   |[0.0, 1.0, 6.0, 258.0]|[3.08, 3.0, 110.0, 21.4, 19.44, 1.0, 3.215]|
|Hornet Sportabout|[0.0, 2.0, 8.0, 360.0]|[3.15, 3.0, 175.0, 18.7, 17.02, 0.0, 3.44] |
+-----------------+----------------------+-------------------------------------------+
only showing top 5 rows

