## Dataframe and SQL API in Spark

This notebook will use Spark in local mode to demonstarate spark's DataFrame and SQL API. I will use a dataset provided by Udacity, which can be found in the free Spark course. It contains the log data from the fictional music streaming service 'sparkify'. 

In [90]:
# imports
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, desc, asc # udf: user defined finctions
from pyspark.sql.functions import isnan, count, when, col, sort_array, avg
from pyspark.sql.functions import sum as Fsum # new name because of basic python sum function 
from pyspark.sql.types import StringType, IntegerType

import datetime as dt

import numpy as np
import pandas as pd
%matplotlib inline
import matplotlib.pyplot as plt

First we will create a SparSession, because it allows us to use the DataFame and SQL API from Spark 

In [6]:
spark = SparkSession\
    .builder\
    .appName("Data Wrangling with Spark")\
    .getOrCreate()

#### get informtion about the SparkSession

In [7]:
spark.sparkContext.getConf().getAll()

[('spark.driver.host', 'markus-mbp'),
 ('spark.driver.port', '56770'),
 ('spark.app.id', 'local-1588835762415'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.app.name', 'Data Wrangling with Spark')]

In [8]:
spark

#### Load the data
we can use spark.read to load jason/csv/txt etc.

In [59]:
data = spark.read.json('sparkify_log_small.json')

In [12]:
# should return pyspark.sql.dataframe.DataFrame
type(data)

pyspark.sql.dataframe.DataFrame

In [11]:
data

DataFrame[artist: string, auth: string, firstName: string, gender: string, itemInSession: bigint, lastName: string, length: double, level: string, location: string, method: string, page: string, registration: bigint, sessionId: bigint, song: string, status: bigint, ts: bigint, userAgent: string, userId: string]

In [14]:
# retuns the cloumns and the format
data.printSchema()

root
 |-- artist: string (nullable = true)
 |-- auth: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- itemInSession: long (nullable = true)
 |-- lastName: string (nullable = true)
 |-- length: double (nullable = true)
 |-- level: string (nullable = true)
 |-- location: string (nullable = true)
 |-- method: string (nullable = true)
 |-- page: string (nullable = true)
 |-- registration: long (nullable = true)
 |-- sessionId: long (nullable = true)
 |-- song: string (nullable = true)
 |-- status: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- userAgent: string (nullable = true)
 |-- userId: string (nullable = true)



In [15]:
# shows the first row 
data.head()

Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046')

In [13]:
# shows n amount of rows 
data.take(5)

[Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046'),
 Row(artist='Lily Allen', auth='Logged In', firstName='Elizabeth', gender='F', itemInSession=7, lastName='Chase', length=195.23873, level='free', location='Shreveport-Bossier City, LA', method='PUT', page='NextSong', registration=1512718541284, sessionId=5027, song='Cheryl Tweedy', status=200, ts=1513720878284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.143 Safari/537.36"', userId='1000'),
 Row(artist='Cobra Starship Featuring Leighton Meester', auth='Logged In', firstNa

In [23]:
# retutns the rows of the DataFrame
data.count()

10000

In [22]:
# returns the count, mean, std, min and max of a 'column'
data.describe('length').show()

+-------+-----------------+
|summary|           length|
+-------+-----------------+
|  count|             8347|
|   mean|249.6486587492506|
| stddev|95.00437130781461|
|    min|          1.12281|
|    max|        1806.8371|
+-------+-----------------+



In [25]:
# select returns a new DataFrame 
data.select('artist') 

DataFrame[artist: string]

In [27]:
# .show() lets use see the transformation
data.select('artist').show()

+--------------------+
|              artist|
+--------------------+
|       Showaddywaddy|
|          Lily Allen|
|Cobra Starship Fe...|
|          Alex Smoke|
|                null|
|                null|
|              Redman|
|     Ulrich Schnauss|
|                null|
|                null|
|               Jay-Z|
|         Evanescence|
|     Scissor Sisters|
|        3 Doors Down|
|       George Younce|
|              Aly-Us|
|                null|
|            BjÃÂ¶rk|
|      David Bromberg|
|          Nickelback|
+--------------------+
only showing top 20 rows



In [29]:
# only shows the unique values of the column artist
data.select('artist').dropDuplicates().show()

+--------------------+
|              artist|
+--------------------+
|      The Black Keys|
|        STRATOVARIUS|
|      The Chameleons|
|Dashboard Confess...|
|      Jarabe De Palo|
|        Ziggy Marley|
|        Yann Tiersen|
|  The Watts Prophets|
|            Goldfish|
|           Kate Nash|
|              DJ Taz|
|    Jane's Addiction|
|         Eva Cassidy|
|               Rufio|
|           Los Lobos|
|         Silverstein|
|        Rhett Miller|
|              Nebula|
|Yonder Mountain S...|
|        Generation X|
+--------------------+
only showing top 20 rows



In [32]:
# lets query the DataFrame and show the Name and gender of a paid subscriber
subscriber = data.select(['firstName', 'lastName', 'gender']).where(data.level == 'paid')

In [37]:
# we have to use an action to trigger an output (lazy evaluation)
subscriber.take(10)

[Row(firstName='Kenneth', lastName='Matthews', gender='M'),
 Row(firstName='Vera', lastName='Blackwell', gender='F'),
 Row(firstName='Sophee', lastName='Barker', gender='F'),
 Row(firstName='Gabriel', lastName='Koch', gender='M'),
 Row(firstName='Mason', lastName='Thomas', gender='M'),
 Row(firstName='Micheal', lastName='Morgan', gender='M'),
 Row(firstName='Mason', lastName='Thomas', gender='M'),
 Row(firstName='Justin', lastName='Jones', gender='M'),
 Row(firstName='Zoie', lastName='Wright', gender='F'),
 Row(firstName='Vera', lastName='Blackwell', gender='F')]

In [40]:
subscriber.show(n=5)

+---------+---------+------+
|firstName| lastName|gender|
+---------+---------+------+
|  Kenneth| Matthews|     M|
|     Vera|Blackwell|     F|
|   Sophee|   Barker|     F|
|  Gabriel|     Koch|     M|
|    Mason|   Thomas|     M|
+---------+---------+------+
only showing top 5 rows



In [38]:
# you could also use .filter() for .where()
data.select(['firstName', 'lastName', 'gender']).filter(data.level == 'free').take(5)

[Row(firstName='Elizabeth', lastName='Chase', gender='F'),
 Row(firstName='Jordyn', lastName='Jones', gender='F'),
 Row(firstName='Jordyn', lastName='Jones', gender='F'),
 Row(firstName='Paige', lastName='Hunter', gender='F'),
 Row(firstName='Alexander', lastName='Short', gender='M')]

In [41]:
# lets check how many females and males are in the log data 
# we can use .grouBy() than the agg function .count() and finally .show() to see the result
data.groupBy(data.gender).count().show()

+------+-----+
|gender|count|
+------+-----+
|     F| 3820|
|  null|  336|
|     M| 5844|
+------+-----+



In [45]:
# we can transfrom variables into different types
data.describe('sessionId')

DataFrame[summary: string, sessionId: string]

In [50]:
# .cast() is like astype()
data.sessionId.cast('int')

Column<b'CAST(sessionId AS INT)'>

##### UDF (User defined functions)
udf lets us create functions and apply them in the Spark DataFrame

In [51]:
function = udf(lambda x: x *2)

In [60]:
# .witchColumn -> Returns a new DataFrame by adding a column or replacing the existing column that has the same name
# status should be 400 now
data.withColumn('status', function(data.status)).head()

Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status='400', ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046')

##### Remove missing values

In [66]:
# we can choose between any and all
# any = drop a row if it contains any nulls
# all = drop a row only if all its values are null
data_clean = data.dropna(how='any')

In [70]:
# we can also specify certain rules for dropping missing values
# in this case we would drop empty strings in user
data_clean = data_clean.where(data.userId != '')

In [74]:
# the output should be empty
data_clean.select('userID').where(data.userId == '').show()

+------+
|userID|
+------+
+------+



#### Spark SQL
To use SQL in Spark, a temporary view must be created after creating a SparkSession and loading the data

In [76]:
data.createOrReplaceTempView('data_sql')

In [78]:
# now we can run normal sql queries
# although we use sql we still have to call an action after the query  
spark.sql('SELECT firstName, lastName, gender FROM data_sql LIMIT 2').show()

+---------+--------+------+
|firstName|lastName|gender|
+---------+--------+------+
|  Kenneth|Matthews|     M|
|Elizabeth|   Chase|     F|
+---------+--------+------+



UDFs can also be used with the SQL API. Lets get the datetime from the timestamp 'ts'

In [82]:
spark.sql('SELECT ts FROM data_sql LIMIT 5').show()

+-------------+
|           ts|
+-------------+
|1513720872284|
|1513720878284|
|1513720881284|
|1513720905284|
|1513720913284|
+-------------+



In [85]:
# to use the udf we need to register it
# the function has the name 'get_hour'
spark.udf.register('get_hour', lambda x: int(dt.datetime.fromtimestamp(x/1000.0).hour))

<function __main__.<lambda>(x)>

In [89]:
# now we use 'get_hour' and place timestamp (ts) in its ()
spark.sql('SELECT *, get_hour(ts) AS hour FROM data_sql LIMIT 10').head()

Row(artist='Showaddywaddy', auth='Logged In', firstName='Kenneth', gender='M', itemInSession=112, lastName='Matthews', length=232.93342, level='paid', location='Charlotte-Concord-Gastonia, NC-SC', method='PUT', page='NextSong', registration=1509380319284, sessionId=5132, song='Christmas Tears Will Fall', status=200, ts=1513720872284, userAgent='"Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/36.0.1985.125 Safari/537.36"', userId='1046', hour='23')

> with .toPandas() at the end we could save the output directly into a pandas DataFrame

#### Transfrom the Spark DataFrame into a pandas DataFrame
after we used spark to access data on a distributed system that would be to large for pandas and made some adjustments to it and extract only the relevnat informations, we can transform a Spark DataFrame into a pandas DataFrame.

In [42]:
pd_df = data.toPandas() 

In [44]:
pd_df.head()

Unnamed: 0,artist,auth,firstName,gender,itemInSession,lastName,length,level,location,method,page,registration,sessionId,song,status,ts,userAgent,userId
0,Showaddywaddy,Logged In,Kenneth,M,112,Matthews,232.93342,paid,"Charlotte-Concord-Gastonia, NC-SC",PUT,NextSong,1509380000000.0,5132,Christmas Tears Will Fall,200,1513720872284,"""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537....",1046
1,Lily Allen,Logged In,Elizabeth,F,7,Chase,195.23873,free,"Shreveport-Bossier City, LA",PUT,NextSong,1512719000000.0,5027,Cheryl Tweedy,200,1513720878284,"""Mozilla/5.0 (Windows NT 6.1) AppleWebKit/537....",1000
2,Cobra Starship Featuring Leighton Meester,Logged In,Vera,F,6,Blackwell,196.20526,paid,"Racine, WI",PUT,NextSong,1499856000000.0,5516,Good Girls Go Bad (Feat.Leighton Meester) (Alb...,200,1513720881284,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",2219
3,Alex Smoke,Logged In,Sophee,F,8,Barker,405.99465,paid,"San Luis Obispo-Paso Robles-Arroyo Grande, CA",PUT,NextSong,1513010000000.0,2372,Don't See The Point,200,1513720905284,"""Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebK...",2373
4,,Logged In,Jordyn,F,0,Jones,,free,"Syracuse, NY",GET,Home,1513649000000.0,1746,,200,1513720913284,"""Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_4...",1747
