# Pyspark Dataframe Basics

In [1]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Review2").getOrCreate()

# print
cores = spark._jsc.sc().getExecutorMemoryStatus().keySet().size()
appid = spark._jsc.sc().applicationId()
print("Working with", cores, "core(s) on appid: ",appid)
spark

Working with 1 core(s) on appid:  local-1608542264352


## Reading in data

In [2]:
path ="../Datasets/"

# how to read csv and json
df = spark.read.csv(path+'students.csv', inferSchema=True,header=True)
read_json = spark.read.json(path+'people.json')

# how to read partquet
read_parquet = spark.read.parquet(path+'users1.parquet')
read_parquets = spark.read.parquet(path+'users*')
read_some_parquets = spark.read.option("basePath", path).parquet(path+'users1.parquet', path+'users2.parquet')

### PySpark vs Pandas Dataframes
Not exactly same. PySpark has limitations.

In [3]:
type(df)

pyspark.sql.dataframe.DataFrame

In [4]:
df2 = df.toPandas()
type(df2)

pandas.core.frame.DataFrame

## Inspection
Always a good idea to do this to ensure that dataframe was read in correctly.

In [5]:
# top 3
df.show(3)

+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
|gender|race/ethnicity|parental level of education|   lunch|test preparation course|math score|reading score|writing score|
+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
|female|       group B|          bachelor's degree|standard|                   none|        72|           72|           74|
|female|       group C|               some college|standard|              completed|        69|           90|           88|
|female|       group B|            master's degree|standard|                   none|        90|           95|           93|
+------+--------------+---------------------------+--------+-----------------------+----------+-------------+-------------+
only showing top 3 rows



In [6]:
# schema
df.printSchema()

root
 |-- gender: string (nullable = true)
 |-- race/ethnicity: string (nullable = true)
 |-- parental level of education: string (nullable = true)
 |-- lunch: string (nullable = true)
 |-- test preparation course: string (nullable = true)
 |-- math score: integer (nullable = true)
 |-- reading score: integer (nullable = true)
 |-- writing score: integer (nullable = true)



In [7]:
# data types
df.dtypes

[('gender', 'string'),
 ('race/ethnicity', 'string'),
 ('parental level of education', 'string'),
 ('lunch', 'string'),
 ('test preparation course', 'string'),
 ('math score', 'int'),
 ('reading score', 'int'),
 ('writing score', 'int')]

In [8]:
# list of columns
df.columns

['gender',
 'race/ethnicity',
 'parental level of education',
 'lunch',
 'test preparation course',
 'math score',
 'reading score',
 'writing score']

In [9]:
# data types for one column
df.schema['reading score'].dataType

IntegerType

## Inspection using regular Pandas

In [10]:
# If your dataframe is more than just a few variables, this method is way better
df.limit(5).toPandas()

Unnamed: 0,gender,race/ethnicity,parental level of education,lunch,test preparation course,math score,reading score,writing score
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group C,some college,standard,completed,69,90,88
2,female,group B,master's degree,standard,none,90,95,93
3,male,group A,associate's degree,free/reduced,none,47,57,44
4,male,group C,some college,standard,none,76,78,75


In [11]:
# Neat "describe" function
df.describe(['race/ethnicity']).show()

+-------+--------------+
|summary|race/ethnicity|
+-------+--------------+
|  count|          1000|
|   mean|          null|
| stddev|          null|
|    min|       group A|
|    max|       group E|
+-------+--------------+



In [12]:
# Summary function
df.select("math score", "reading score", "writing score").summary("count", "min", "25%", "75%", "max").show()

+-------+----------+-------------+-------------+
|summary|math score|reading score|writing score|
+-------+----------+-------------+-------------+
|  count|      1000|         1000|         1000|
|    min|         0|           17|           10|
|    25%|        57|           59|           57|
|    75%|        77|           79|           79|
|    max|       100|          100|          100|
+-------+----------+-------------+-------------+



## Specify data types as you read in datasets.

Spark data types : https://spark.apache.org/docs/latest/sql-reference.html

In [13]:
from pyspark.sql.types import * #StructField,StringType,IntegerType,StructType

In [14]:
data_schema = [StructField("age", IntegerType(), True),StructField("name", StringType(), True)]
final_struc = StructType(fields=data_schema)
people = spark.read.json(path+'people.json', schema=final_struc)

## Writing Data

In [15]:
# write dataframe to local
df.write.mode("overwrite").csv('write_test.csv')

**Parquet files**

In [17]:
# replace apostrophes
from pyspark.sql.functions import *
df = df.withColumnRenamed('parental level of education', 'ParentalLevelOfEducation')
df = df.withColumnRenamed('test preparation course', 'TestPreparationCourse')
df = df.withColumnRenamed('reading score', 'ReadingScore')
df = df.withColumnRenamed('math score', 'MathScore')
df = df.withColumnRenamed('writing score', 'WritingScore')

# write parquet
df.write.mode("overwrite").parquet("parquet/")

#### Writting your own dataframes in Jupyter Notebooks!

In [18]:
values = [('Pear',10),('Orange',36),('Banana',123),('Kiwi',48),('Peach',16),('Strawberry',1)]
mydf = spark.createDataFrame(values,['fruit','quantity'])
mydf.show()

+----------+--------+
|     fruit|quantity|
+----------+--------+
|      Pear|      10|
|    Orange|      36|
|    Banana|     123|
|      Kiwi|      48|
|     Peach|      16|
|Strawberry|       1|
+----------+--------+



## Select Data

In [29]:
from pyspark.sql.functions import *

**Basic Select**

In [20]:
df.select(['gender','race/ethnicity']).show(5)

+------+--------------+
|gender|race/ethnicity|
+------+--------------+
|female|       group B|
|female|       group C|
|female|       group B|
|  male|       group A|
|  male|       group C|
+------+--------------+
only showing top 5 rows



**Order By**

In [22]:
df.select(['gender','race/ethnicity','MathScore']).orderBy("MathScore").show(5)

+------+--------------+---------+
|gender|race/ethnicity|MathScore|
+------+--------------+---------+
|female|       group C|        0|
|female|       group B|        8|
|female|       group B|       18|
|female|       group B|       19|
|female|       group C|       22|
+------+--------------+---------+
only showing top 5 rows



**Order By Descending**

In [25]:
df.select(['gender','race/ethnicity','MathScore']).orderBy(df["MathScore"].desc()).show(5)

+------+--------------+---------+
|gender|race/ethnicity|MathScore|
+------+--------------+---------+
|  male|       group E|      100|
|female|       group E|      100|
|female|       group E|      100|
|  male|       group A|      100|
|  male|       group D|      100|
+------+--------------+---------+
only showing top 5 rows



**Like**

In [26]:
df.show(10)

+------+--------------+------------------------+------------+---------------------+---------+------------+------------+
|gender|race/ethnicity|ParentalLevelOfEducation|       lunch|TestPreparationCourse|MathScore|ReadingScore|WritingScore|
+------+--------------+------------------------+------------+---------------------+---------+------------+------------+
|female|       group B|       bachelor's degree|    standard|                 none|       72|          72|          74|
|female|       group C|            some college|    standard|            completed|       69|          90|          88|
|female|       group B|         master's degree|    standard|                 none|       90|          95|          93|
|  male|       group A|      associate's degree|free/reduced|                 none|       47|          57|          44|
|  male|       group C|            some college|    standard|                 none|       76|          78|          75|
|female|       group B|      associate's

In [43]:
df.where(df['ParentalLevelOfEducation'].like("%degree%")).show(5, False)

+------+--------------+------------------------+------------+---------------------+---------+------------+------------+
|gender|race/ethnicity|ParentalLevelOfEducation|lunch       |TestPreparationCourse|MathScore|ReadingScore|WritingScore|
+------+--------------+------------------------+------------+---------------------+---------+------------+------------+
|female|group B       |bachelor's degree       |standard    |none                 |72       |72          |74          |
|female|group B       |master's degree         |standard    |none                 |90       |95          |93          |
|male  |group A       |associate's degree      |free/reduced|none                 |47       |57          |44          |
|female|group B       |associate's degree      |standard    |none                 |71       |83          |78          |
|male  |group C       |associate's degree      |standard    |none                 |58       |54          |52          |
+------+--------------+-----------------

**Substrings**

In [44]:
df.select(df['ParentalLevelOfEducation'].substr(1,3)).show(5,False)

+-----------------------------------------+
|substring(ParentalLevelOfEducation, 1, 3)|
+-----------------------------------------+
|bac                                      |
|som                                      |
|mas                                      |
|ass                                      |
|som                                      |
+-----------------------------------------+
only showing top 5 rows



**IS IN**

In [45]:
df[df['race/ethnicity'].isin("group A", 'group B')].limit(4).toPandas()

Unnamed: 0,gender,race/ethnicity,ParentalLevelOfEducation,lunch,TestPreparationCourse,MathScore,ReadingScore,WritingScore
0,female,group B,bachelor's degree,standard,none,72,72,74
1,female,group B,master's degree,standard,none,90,95,93
2,male,group A,associate's degree,free/reduced,none,47,57,44
3,female,group B,associate's degree,standard,none,71,83,78


**Starts with Ends with**

Search for a specific case - begins with "x" and ends with "x"

In [47]:
df\
    .where(df['ParentalLevelOfEducation'].startswith("bachelor")) \
    .where(df['ParentalLevelOfEducation'].endswith("degree")) \
    .limit(4).toPandas()

Unnamed: 0,gender,race/ethnicity,ParentalLevelOfEducation,lunch,TestPreparationCourse,MathScore,ReadingScore,WritingScore
0,female,group B,bachelor's degree,standard,none,72,72,74
1,male,group D,bachelor's degree,free/reduced,completed,74,71,80
2,female,group C,bachelor's degree,standard,none,67,69,75
3,male,group E,bachelor's degree,free/reduced,completed,79,74,72


**Slicing**

pyspark.sql.functions.slice(x, start, length)

In [50]:
from pyspark.sql.functions import slice

# create and show df with arrays
mydf = spark.createDataFrame([([1, 2, 3, 4],), ([5, 6, 7, 8],)], ['x']) 
mydf.show()

# slice elements in df, from index 2, for 2 values
mydf.select(slice(mydf.x, 2, 2).alias("Middle 2 Values")).show()

+------------+
|           x|
+------------+
|[1, 2, 3, 4]|
|[5, 6, 7, 8]|
+------------+

+---------------+
|Middle 2 Values|
+---------------+
|         [2, 3]|
|         [6, 7]|
+---------------+



If you want to just slice your dataframe you can do this....

In [62]:
# Slice rows/cols
mydf = df.limit(300)
mydf = mydf.select(mydf.columns[0:5])

print('size before slicing:', df.toPandas().shape)
print('size after slicing:', mydf.toPandas().shape)

size before slicing: (1000, 8)
size after slicing: (300, 5)


## Filtering Data

A large part of working with DataFrames is the ability to quickly filter out data based on conditions. Spark DataFrames are built on top of the Spark SQL platform, which means that is you already know SQL, you can quickly and easily grab that data using SQL commands, or using the DataFram methods (which is what we focus on in this course).

In [64]:
fifa = spark.read.csv(path+'fifa19.csv', inferSchema=True,header=True)
fifa.filter("Overall>50").limit(4).toPandas()

In [66]:
# Using SQL with .select()
fifa.filter("Overall>50").select(['ID','Name','Nationality','Overall']).limit(4).toPandas()

Unnamed: 0,ID,Name,Nationality,Overall
0,158023,L. Messi,Argentina,94
1,20801,Cristiano Ronaldo,Portugal,94
2,190871,Neymar Jr,Brazil,92
3,193080,De Gea,Spain,91


### Collecting Results as Python Objects

In [67]:
# Collecting results as Python objects: you need the ".collect()" call at the end to "collect" the results
result = fifa\
    .select(['Nationality','Name','Age','Overall'])\
    .filter("Overall>70")\
    .orderBy(fifa["Overall"].desc())\
    .collect()

In [75]:
print("Best Player Over Score 70:",result[0][1])

Best Player Over Score 70: L. Messi


Rows can also be called to turn into dictionaries if needed

In [77]:
row = result[0]
row.asDict()

{'Nationality': 'Argentina', 'Name': 'L. Messi', 'Age': 31, 'Overall': 94}

In [78]:
for item in result[0]:
    print(item)

Argentina
L. Messi
31
94


## SQL Options in Spark

### Spark SQL

Spark TempView provides two functions that allow users to run SQL queries against a Spark DataFrame:

createOrReplaceTempView: The lifetime of this temporary view is tied to the [[SparkSession]] that was used to create this Dataset. It creates (or replaces if that view name already exists) a lazily evaluated "view" that you can then use like a hive table in Spark SQL. It does not persist to memory unless you cache the dataset that underpins the view.

createGlobalTempView: The lifetime of this temporary view is tied to this Spark application.

In [85]:
df = spark.read.csv(path+'rec-crime-pfa.csv', inferSchema=True,header=True)
df = df.withColumnRenamed('12 months ending', '12mo')
df = df.withColumnRenamed('Rolling year total number of offences', 'Count')
df.show(4)

+----------+-----------------+----------+--------------------+-----+
|      12mo|              PFA|    Region|             Offence|Count|
+----------+-----------------+----------+--------------------+-----+
|31/03/2003|Avon and Somerset|South West|All other theft o...|25959|
|31/03/2003|Avon and Somerset|South West|       Bicycle theft| 3090|
|31/03/2003|Avon and Somerset|South West|Criminal damage a...|26202|
|31/03/2003|Avon and Somerset|South West|Death or serious ...|    2|
+----------+-----------------+----------+--------------------+-----+
only showing top 4 rows



In [86]:
# Create a temporary view of the dataframe
df.createOrReplaceTempView("tempview")

In [88]:
# Then Query the temp view
spark.sql("SELECT * FROM tempview WHERE Count > 1000").limit(5).toPandas()

Unnamed: 0,12mo,PFA,Region,Offence,Count
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202
3,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561
4,31/03/2003,Avon and Somerset,South West,Drug offences,2308


In [89]:
# Or pass it to an object
sql_results = spark.sql("SELECT * FROM tempview WHERE Count > 1000 AND Region='South West'")
sql_results.limit(5).toPandas()

Unnamed: 0,12mo,PFA,Region,Offence,Count
0,31/03/2003,Avon and Somerset,South West,All other theft offences,25959
1,31/03/2003,Avon and Somerset,South West,Bicycle theft,3090
2,31/03/2003,Avon and Somerset,South West,Criminal damage and arson,26202
3,31/03/2003,Avon and Somerset,South West,Domestic burglary,14561
4,31/03/2003,Avon and Somerset,South West,Drug offences,2308


In [98]:
sqlstr = "SELECT Region, sum(Count) AS Total FROM tempview GROUP BY Region"
spark.sql(sqlstr).limit(5).toPandas()

Unnamed: 0,Region,Total
0,Fraud: CIFAS,7678981
1,North West,30235732
2,British Transport Police,3029117
3,Wales,11137260
4,London,42691902


### SQL Transformer

You also have the option to use the SQL transformer option where you can write freeform SQL scripts.

In [95]:
# First we need to import SQL transformer
from pyspark.ml.feature import SQLTransformer

In [102]:
# __THIS__ is keyword for table used by transformer
sqlTrans = SQLTransformer(statement="SELECT Region, Offence FROM __THIS__")
sqlTrans.transform(df).show(5)

+----------+--------------------+
|    Region|             Offence|
+----------+--------------------+
|South West|All other theft o...|
|South West|       Bicycle theft|
|South West|Criminal damage a...|
|South West|Death or serious ...|
|South West|   Domestic burglary|
+----------+--------------------+
only showing top 5 rows



In [104]:
# group by using sql transformer
sqlTrans = SQLTransformer(statement="SELECT Offence, SUM(Count) as Total FROM __THIS__ GROUP BY Offence")
sqlTrans.transform(df).show(5)

+--------------------+--------+
|             Offence|   Total|
+--------------------+--------+
|Public order offe...|10925676|
|       Bicycle theft| 5297006|
|Residential burglary| 1671469|
|Violence without ...|16590158|
|All other theft o...|30979393|
+--------------------+--------+
only showing top 5 rows



# GroupBy and Aggregate Functions


In [112]:
custom_schema = StructType([
    StructField("id", IntegerType(), True),
    StructField("name", StringType(), True),
    StructField("host_id", IntegerType(), True),
    StructField("host_name", StringType(), True),
    StructField("neighbourhood_group", StringType(), True),
    StructField("neighbourhood", StringType(), True),
    StructField("latitude", DecimalType(), True),
    StructField("longitude", DecimalType(), True),
    StructField("room_type", StringType(), True),
    StructField("price", DecimalType(), True),
    StructField("minimum_nights", IntegerType(), True),
    StructField("number_of_reviews", IntegerType(), True),
    StructField("last_review", DateType(), True),
    StructField("reviews_per_month", DecimalType(), True),
    StructField("calculated_host_listings_count", IntegerType(), True),
    StructField("availability_365", IntegerType(), True),
])
df = spark.read.format("csv") \
    .schema(custom_schema) \
    .option("header", True) \
    .load(path+'nyc_air_bnb.csv')
df.toPandas().head(5)

Unnamed: 0,id,name,host_id,host_name,neighbourhood_group,neighbourhood,latitude,longitude,room_type,price,minimum_nights,number_of_reviews,last_review,reviews_per_month,calculated_host_listings_count,availability_365
0,2539.0,Clean & quiet apt home by the park,2787.0,John,Brooklyn,Kensington,41,-74,Private room,149,1.0,9.0,2018-10-19,0.0,6.0,365.0
1,2595.0,Skylit Midtown Castle,2845.0,Jennifer,Manhattan,Midtown,41,-74,Entire home/apt,225,1.0,45.0,2019-05-21,0.0,2.0,355.0
2,3647.0,THE VILLAGE OF HARLEM....NEW YORK !,4632.0,Elisabeth,Manhattan,Harlem,41,-74,Private room,150,3.0,0.0,,,1.0,365.0
3,3831.0,Cozy Entire Floor of Brownstone,4869.0,LisaRoxanne,Brooklyn,Clinton Hill,41,-74,Entire home/apt,89,1.0,270.0,2019-07-05,5.0,1.0,194.0
4,5022.0,Entire Apt: Spacious Studio/Loft by central park,7192.0,Laura,Manhattan,East Harlem,41,-74,Entire home/apt,80,10.0,9.0,2018-11-19,0.0,1.0,0.0


In [113]:
# Groupby Function with count (you can also use sum, min, max)
df.groupBy("neighbourhood_group").count().show(100)

+--------------------+-----+
| neighbourhood_group|count|
+--------------------+-----+
|          Douglaston|    1|
|              Queens| 5630|
|               Nadia|    1|
|             Midtown|    4|
|     Jackson Heights|    2|
|      Hell's Kitchen|    7|
|   Greenwich Village|    2|
|        Clinton Hill|    1|
|  Washington Heights|    4|
|    Ditmars Steinway|    3|
|            Longwood|    2|
|           Briarwood|    1|
|         Little Neck|    1|
|            Flushing|    3|
|       Randall Manor|    1|
|              Carmen|    1|
|       East Elmhurst|    2|
|     Upper East Side|    7|
|                null|  185|
|          Bath Beach|    1|
|            Canarsie|    4|
|              Evelyn|    1|
|         East Harlem|    5|
|             Astoria|    2|
|        East Village|    4|
|         Fort Greene|    1|
|          Mott Haven|    5|
|            Gramercy|    1|
|        Williamsburg|    6|
|                   D|    1|
|         Throgs Neck|    1|
|            E

In [114]:
# Then you can add the following aggregate functions: mean, count, min, max, sum
df.groupBy("neighbourhood_group").mean("price").show()

+-------------------+----------+
|neighbourhood_group|avg(price)|
+-------------------+----------+
|         Douglaston|    1.0000|
|             Queens|   99.5769|
|              Nadia|      null|
|            Midtown|    9.0000|
|    Jackson Heights|   16.0000|
|     Hell's Kitchen|    1.2857|
|  Greenwich Village|   55.5000|
|       Clinton Hill|   14.0000|
| Washington Heights|    2.7500|
|   Ditmars Steinway|    3.3333|
|           Longwood|    5.0000|
|          Briarwood|    1.0000|
|        Little Neck|    1.0000|
|           Flushing|   10.3333|
|      Randall Manor|    7.0000|
|             Carmen|      null|
|      East Elmhurst|    1.0000|
|    Upper East Side|    1.5714|
|               null|      null|
|         Bath Beach|    2.0000|
+-------------------+----------+
only showing top 20 rows



In [115]:
# This is also a pretty neat function you can use:
summary = df.summary("count", "min", "25%", "75%", "max")
summary.toPandas()

# or a prettier version
limit_summary = df.select("price","minimum_nights","number_of_reviews","last_review","reviews_per_month","calculated_host_listings_count","availability_365").summary("count","min","max")
limit_summary.toPandas()

Unnamed: 0,summary,price,minimum_nights,number_of_reviews,reviews_per_month,calculated_host_listings_count,availability_365
0,count,48887,48891,48737,38858,48887,48737
1,min,-74,0,0,0,0,0
2,max,10000,1250,629,59,365,365


In [116]:
# Here's another way of doing it
df.select(countDistinct("neighbourhood_group"),avg('price'),stddev("price")).show()

+-----------------------------------+----------+------------------+
|count(DISTINCT neighbourhood_group)|avg(price)|stddev_samp(price)|
+-----------------------------------+----------+------------------+
|                                 77|  152.2230|238.54148624491813|
+-----------------------------------+----------+------------------+



**Aggregate on the entire DataFrame without groups (shorthand for df.groupBy.agg()).**

In [117]:
from pyspark.sql import functions as F
df.agg(F.min(df.price).alias("Min Price")).show()

+---------+
|Min Price|
+---------+
|      -74|
+---------+



In [118]:
# Max sales across everything
df.agg({'number_of_reviews':'max'}).withColumnRenamed("max(number_of_reviews)", "Max Reviews").show()

+-----------+
|Max Reviews|
+-----------+
|        629|
+-----------+



In [119]:
# And then if you want to group by you can do this:
df.groupBy("neighbourhood").agg({'number_of_reviews':'max'}).show()

+-----------------+----------------------+
|    neighbourhood|max(number_of_reviews)|
+-----------------+----------------------+
|           Corona|                   166|
|     Richmondtown|                    79|
|     Prince's Bay|                    15|
|      Westerleigh|                    17|
|       Mill Basin|                    36|
|         40.76199|                  null|
|     Civic Center|                   319|
|         40.83166|                  null|
|       Douglaston|                    49|
|       Mount Hope|                    80|
|          40.7578|                  null|
|         40.80958|                  null|
|      Marble Hill|                    85|
|        Rego Park|                   175|
|         40.81225|                  null|
|         40.76805|                  null|
|         40.64936|                  null|
|    Dyker Heights|                    69|
|         40.76364|                  null|
|Kew Gardens Hills|                    82|
+----------

**Pivot Function**

Provides a two way table

In [120]:
# Pivot Function
df\
    .filter("room_type='Shared room'")\
    .groupBy("room_type")\
    .pivot("neighbourhood_group", ["Queens", "Brooklyn"])\
    .count()\
    .show(100)

+-----------+------+--------+
|  room_type|Queens|Brooklyn|
+-----------+------+--------+
|Shared room|   198|     413|
+-----------+------+--------+



## Joining and Appending DataFrames in PySpark

In [123]:
valuesA = [('Pirate',1,'Arrrg'),('Monkey',2,'Oooo'),('Ninja',3,'Yaaaa'),('Spaghetti',4,'Slurp!')]
TableA = spark.createDataFrame(valuesA,['name','id','sound'])

valuesB = [('Rutabaga',1,2),('Pirate',2,45),('Ninja',3,102),('Darth Vader',4,87)]
TableB = spark.createDataFrame(valuesB,['name','id','age'])

print("This is TableA")
TableA.show()
print("And this is TableB")
TableB.show()

This is TableA
+---------+---+------+
|     name| id| sound|
+---------+---+------+
|   Pirate|  1| Arrrg|
|   Monkey|  2|  Oooo|
|    Ninja|  3| Yaaaa|
|Spaghetti|  4|Slurp!|
+---------+---+------+

And this is TableB
+-----------+---+---+
|       name| id|age|
+-----------+---+---+
|   Rutabaga|  1|  2|
|     Pirate|  2| 45|
|      Ninja|  3|102|
|Darth Vader|  4| 87|
+-----------+---+---+



## Appends

In [129]:
print("TableA count:", TableA.count())

# append table A with itself, through UNION (all)
new_df = TableA
df_concat = TableA.union(new_df)
print("TableA union itself Counts:", df_concat.count(), len(df_concat.columns))

TableA.show(100)
df_concat.show(100)

TableA count: 4
TableA union itself Counts: 8 3
+---------+---+------+
|     name| id| sound|
+---------+---+------+
|   Pirate|  1| Arrrg|
|   Monkey|  2|  Oooo|
|    Ninja|  3| Yaaaa|
|Spaghetti|  4|Slurp!|
+---------+---+------+

+---------+---+------+
|     name| id| sound|
+---------+---+------+
|   Pirate|  1| Arrrg|
|   Monkey|  2|  Oooo|
|    Ninja|  3| Yaaaa|
|Spaghetti|  4|Slurp!|
|   Pirate|  1| Arrrg|
|   Monkey|  2|  Oooo|
|    Ninja|  3| Yaaaa|
|Spaghetti|  4|Slurp!|
+---------+---+------+



## Joins!

All options:

In [130]:
inner_join = TableA.join(TableB, ["name","id"],"inner")
print("Inner Join Example:")
inner_join.show()

left_join = TableA.join(TableB, ["name","id"], how='left') # Could also use 'left_outer'
print("Left Join Example:")
left_join.show()

conditional_join = TableA.join(TableB, ["name","id"], how='left').filter(TableB.name.isNull())
print("Conditional Left Join:")
conditional_join.show()

right_join = TableA.join(TableB,  ["name","id"],how='right') # Could also use 'right_outer'
print("Right Join:")
right_join.show()

full_outer_join = TableA.join(TableB, ["name","id"],how='full') # Could also use 'full_outer'
print("Full Outer Join:")
full_outer_join.show()

Inner Join Example
+-----+---+-----+---+
| name| id|sound|age|
+-----+---+-----+---+
|Ninja|  3|Yaaaa|102|
+-----+---+-----+---+

None
Left Join Example
+---------+---+------+----+
|     name| id| sound| age|
+---------+---+------+----+
|   Pirate|  1| Arrrg|null|
|    Ninja|  3| Yaaaa| 102|
|   Monkey|  2|  Oooo|null|
|Spaghetti|  4|Slurp!|null|
+---------+---+------+----+

None
Conditional Left Join
+---------+---+------+----+
|     name| id| sound| age|
+---------+---+------+----+
|   Pirate|  1| Arrrg|null|
|   Monkey|  2|  Oooo|null|
|Spaghetti|  4|Slurp!|null|
+---------+---+------+----+

None
Right Join
+-----------+---+-----+---+
|       name| id|sound|age|
+-----------+---+-----+---+
|Darth Vader|  4| null| 87|
|      Ninja|  3|Yaaaa|102|
|   Rutabaga|  1| null|  2|
|     Pirate|  2| null| 45|
+-----------+---+-----+---+

None
Full Outer Join
+-----------+---+------+----+
|       name| id| sound| age|
+-----------+---+------+----+
|     Pirate|  1| Arrrg|null|
|Darth Vader|  4

## Handling Missing Data

In [134]:
df = spark.read.csv(path+'zomato.csv', inferSchema=True,header=True)
#df = df.withColumnRenamed('12 months ending', '12mo')
df.toPandas().head(5)

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,https://www.zomato.com/bangalore/jalsa-banasha...,"942, 21st Main Road, 2nd Stage, Banashankari, ...",Jalsa,Yes,Yes,4.1/5,775,080 42297555,,,,,,,,,
1,"+91 9743772233""",Banashankari,Casual Dining,"Pasta, Lunch Buffet, Masala Papad, Paneer Laja...","North Indian, Mughlai, Chinese",800,"""[('Rated 4.0', 'RATED\n A beautiful place to...",('Rated 4.0','RATED\n You canÃ\x83Ã\x83Ã\x82Ã\x82Ã\x...,('Rated 5.0','RATED\n Overdelighted by the service and fo...,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...
2,https://www.zomato.com/bangalore/spice-elephan...,"2nd Floor, 80 Feet Road, Near Big Bazaar, 6th ...",Spice Elephant,Yes,No,4.1/5,787,080 41714161,Banashankari,Casual Dining,"Momos, Lunch Buffet, Chocolate Nirvana, Thai G...","Chinese, North Indian, Thai",800,"""[('Rated 4.0', 'RATED\n Had been here for di...",rice was well cooked and overall was great\n\n...,('Rated 5.0','RATED\n This place just cool ? with good am...
3,https://www.zomato.com/SanchurroBangalore?cont...,"1112, Next to KIMS Medical College, 17th Cross...",San Churro Cafe,Yes,No,3.8/5,918,+91 9663487993,Banashankari,"Cafe, Casual Dining","Churros, Cannelloni, Minestrone Soup, Hot Choc...","Cafe, Mexican, Italian",800,"""[('Rated 3.0', """"RATED\n Ambience is not tha...",('Rated 3.0',"""""RATED\n \nWent there for a quick bite with ...",pasta churros and lasagne.\n\nNachos were pat...
4,https://www.zomato.com/bangalore/addhuri-udupi...,"1st Floor, Annakuteera, 3rd Stage, Banashankar...",Addhuri Udupi Bhojana,No,No,3.7/5,88,+91 9620009302,Banashankari,Quick Bites,Masala Dosa,"South Indian, North Indian",300,"""[('Rated 4.0', """"RATED\n Great food and prop...",('Rated 2.0','RATED\n Reached the place at 3pm on Saturda...,('Rated 4.0'


In [135]:
from pyspark.sql import functions as F

In [136]:
# find where cuisines is null
df.filter(df.cuisines.isNull()).select(['name','cuisines']).show()

+--------------------+--------+
|                name|cuisines|
+--------------------+--------+
|               Jalsa|    null|
|       Grand Village|    null|
|       Casual Dining|    null|
|     Timepass Dinner|    null|
|       Casual Dining|    null|
|Rosewood Internat...|    null|
|       Casual Dining|    null|
|              Onesta|    null|
|      Penthouse Cafe|    null|
|           Smacznego|    null|
|CafÃÂÃÂÃÂÃ...|    null|
|       Cafe Vivacity|    null|
|        Catch-up-ino|    null|
|    Kirthi's Biryani|    null|
|    The Vintage Cafe|    null|
|        My Tea House|    null|
|    Srinathji's Cafe|    null|
| Casual Dining, Cafe|    null|
|     Behrouz Biryani|    null|
|     Szechuan Dragon|    null|
+--------------------+--------+
only showing top 20 rows



**Missing Data Statistics**

In [137]:
from pyspark.sql.functions import isnan, when, count, col

nulls = df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df.columns])
percent = df.select([format_number(((count(when(isnan(c) | col(c).isNull(), c))/df.count())*100),1).alias(c) for c in df.columns])
result = nulls.union(percent)
result.toPandas()

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,0.0,0.0,85.0,8111.0,2.0,7775.0,0.0,1227.0,20054.0,20165.0,46841.0,27305.0,28143.0,28185.0,28611.0,28983.0,29344.0
1,0.0,0.0,0.1,11.3,0.0,10.8,0.0,1.7,28.0,28.1,65.3,38.1,39.2,39.3,39.9,40.4,40.9


In [138]:
from pyspark.sql.functions import *

def null_value_count(df):
    null_columns_counts = []
    numRows = df.count()
    for k in df.columns:
        nullRows = df.where(col(k).isNull()).count()
        if(nullRows > 0):
            temp = k,nullRows
            null_columns_counts.append(temp)
    return(null_columns_counts)

null_columns_count_list = null_value_count(df)
spark.createDataFrame(null_columns_count_list, ['Column_With_Null_Value', 'Null_Values_Count']).show()

+----------------------+-----------------+
|Column_With_Null_Value|Null_Values_Count|
+----------------------+-----------------+
|                  name|               85|
|          online_order|             8111|
|            book_table|                2|
|                  rate|             7775|
|                 phone|             1227|
|              location|            20054|
|             rest_type|            20165|
|            dish_liked|            46841|
|              cuisines|            27305|
|  approx_cost(for t...|            28143|
|          reviews_list|            28185|
|             menu_item|            28611|
|       listed_in(type)|            28983|
|       listed_in(city)|            29344|
+----------------------+-----------------+



**Drop all missing data**

PySpark has a really handy .na function for working with missing data. The drop command has the following parameters:

    df.na.drop(how='any', thresh=None, subset=None)

In [139]:
df.na.drop().limit(4).toPandas() 

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,"+91 9743772233""",Banashankari,Casual Dining,"Pasta, Lunch Buffet, Masala Papad, Paneer Laja...","North Indian, Mughlai, Chinese",800,"""[('Rated 4.0', 'RATED\n A beautiful place to...",('Rated 4.0','RATED\n You canÃ\x83Ã\x83Ã\x82Ã\x82Ã\x...,('Rated 5.0','RATED\n Overdelighted by the service and fo...,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...
1,https://www.zomato.com/bangalore/spice-elephan...,"2nd Floor, 80 Feet Road, Near Big Bazaar, 6th ...",Spice Elephant,Yes,No,4.1/5,787,080 41714161,Banashankari,Casual Dining,"Momos, Lunch Buffet, Chocolate Nirvana, Thai G...","Chinese, North Indian, Thai",800,"""[('Rated 4.0', 'RATED\n Had been here for di...",rice was well cooked and overall was great\n\n...,('Rated 5.0','RATED\n This place just cool ? with good am...
2,https://www.zomato.com/SanchurroBangalore?cont...,"1112, Next to KIMS Medical College, 17th Cross...",San Churro Cafe,Yes,No,3.8/5,918,+91 9663487993,Banashankari,"Cafe, Casual Dining","Churros, Cannelloni, Minestrone Soup, Hot Choc...","Cafe, Mexican, Italian",800,"""[('Rated 3.0', """"RATED\n Ambience is not tha...",('Rated 3.0',"""""RATED\n \nWent there for a quick bite with ...",pasta churros and lasagne.\n\nNachos were pat...
3,https://www.zomato.com/bangalore/addhuri-udupi...,"1st Floor, Annakuteera, 3rd Stage, Banashankar...",Addhuri Udupi Bhojana,No,No,3.7/5,88,+91 9620009302,Banashankari,Quick Bites,Masala Dosa,"South Indian, North Indian",300,"""[('Rated 4.0', """"RATED\n Great food and prop...",('Rated 2.0','RATED\n Reached the place at 3pm on Saturda...,('Rated 4.0'


In [140]:
# Of course you will want to know how many rows that affected before you actually execute it..
og_len = df.count()
drop_len = df.na.drop().count()
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 52402
Percentage of Rows Dropped 0.7305450996793531


In [141]:
# Drop rows that have at least 8 NON-null values
og_len = df.count()
drop_len = df.na.drop(thresh=8).count()
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 1669
Percentage of Rows Dropped 0.0232678098424648


In [142]:
# Only drop the rows whose values in the sales column are null
og_len = df.count()
drop_len = df.na.drop(subset=["rate"]).count() 
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 7775
Percentage of Rows Dropped 0.10839258329848041


In [144]:
# Another way to do the above
og_len = df.count()
drop_len = df.filter(df.rate.isNotNull()).count() 
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 7775
Percentage of Rows Dropped 0.10839258329848041


In [145]:
# Drop a row only if ALL its values are null.
og_len = df.count()
drop_len = df.na.drop(how='all').count() 
print("Total Rows Dropped:",og_len-drop_len)
print("Percentage of Rows Dropped", (og_len-drop_len)/og_len)

Total Rows Dropped: 0
Percentage of Rows Dropped 0.0


### Fill the missing values

We can also fill the missing values with new values. If you have multiple nulls across multiple data types, Spark is actually smart enough to match up the data types. For example:

In [146]:
# Fill all nulls values with one common value (character value)
df.na.fill('MISSING').limit(4).toPandas()

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,https://www.zomato.com/bangalore/jalsa-banasha...,"942, 21st Main Road, 2nd Stage, Banashankari, ...",Jalsa,Yes,Yes,4.1/5,775,080 42297555,MISSING,MISSING,MISSING,MISSING,MISSING,MISSING,MISSING,MISSING,MISSING
1,"+91 9743772233""",Banashankari,Casual Dining,"Pasta, Lunch Buffet, Masala Papad, Paneer Laja...","North Indian, Mughlai, Chinese",800,"""[('Rated 4.0', 'RATED\n A beautiful place to...",('Rated 4.0','RATED\n You canÃ\x83Ã\x83Ã\x82Ã\x82Ã\x...,('Rated 5.0','RATED\n Overdelighted by the service and fo...,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...
2,https://www.zomato.com/bangalore/spice-elephan...,"2nd Floor, 80 Feet Road, Near Big Bazaar, 6th ...",Spice Elephant,Yes,No,4.1/5,787,080 41714161,Banashankari,Casual Dining,"Momos, Lunch Buffet, Chocolate Nirvana, Thai G...","Chinese, North Indian, Thai",800,"""[('Rated 4.0', 'RATED\n Had been here for di...",rice was well cooked and overall was great\n\n...,('Rated 5.0','RATED\n This place just cool ? with good am...
3,https://www.zomato.com/SanchurroBangalore?cont...,"1112, Next to KIMS Medical College, 17th Cross...",San Churro Cafe,Yes,No,3.8/5,918,+91 9663487993,Banashankari,"Cafe, Casual Dining","Churros, Cannelloni, Minestrone Soup, Hot Choc...","Cafe, Mexican, Italian",800,"""[('Rated 3.0', """"RATED\n Ambience is not tha...",('Rated 3.0',"""""RATED\n \nWent there for a quick bite with ...",pasta churros and lasagne.\n\nNachos were pat...


In [147]:
# Fill all nulls values with one common value (numeric value)
df.na.fill(999).limit(10).toPandas()

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,https://www.zomato.com/bangalore/jalsa-banasha...,"942, 21st Main Road, 2nd Stage, Banashankari, ...",Jalsa,Yes,Yes,4.1/5,775,080 42297555,,,,,,,,,
1,"+91 9743772233""",Banashankari,Casual Dining,"Pasta, Lunch Buffet, Masala Papad, Paneer Laja...","North Indian, Mughlai, Chinese",800,"""[('Rated 4.0', 'RATED\n A beautiful place to...",('Rated 4.0','RATED\n You canÃ\x83Ã\x83Ã\x82Ã\x82Ã\x...,('Rated 5.0','RATED\n Overdelighted by the service and fo...,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...
2,https://www.zomato.com/bangalore/spice-elephan...,"2nd Floor, 80 Feet Road, Near Big Bazaar, 6th ...",Spice Elephant,Yes,No,4.1/5,787,080 41714161,Banashankari,Casual Dining,"Momos, Lunch Buffet, Chocolate Nirvana, Thai G...","Chinese, North Indian, Thai",800,"""[('Rated 4.0', 'RATED\n Had been here for di...",rice was well cooked and overall was great\n\n...,('Rated 5.0','RATED\n This place just cool ? with good am...
3,https://www.zomato.com/SanchurroBangalore?cont...,"1112, Next to KIMS Medical College, 17th Cross...",San Churro Cafe,Yes,No,3.8/5,918,+91 9663487993,Banashankari,"Cafe, Casual Dining","Churros, Cannelloni, Minestrone Soup, Hot Choc...","Cafe, Mexican, Italian",800,"""[('Rated 3.0', """"RATED\n Ambience is not tha...",('Rated 3.0',"""""RATED\n \nWent there for a quick bite with ...",pasta churros and lasagne.\n\nNachos were pat...
4,https://www.zomato.com/bangalore/addhuri-udupi...,"1st Floor, Annakuteera, 3rd Stage, Banashankar...",Addhuri Udupi Bhojana,No,No,3.7/5,88,+91 9620009302,Banashankari,Quick Bites,Masala Dosa,"South Indian, North Indian",300,"""[('Rated 4.0', """"RATED\n Great food and prop...",('Rated 2.0','RATED\n Reached the place at 3pm on Saturda...,('Rated 4.0'
5,https://www.zomato.com/bangalore/grand-village...,"10, 3rd Floor, Lakshmi Associates, Gandhi Baza...",Grand Village,No,No,3.8/5,166,+91 8026612447,,,,,,,,,
6,"+91 9901210005""",Basavanagudi,Casual Dining,"Panipuri, Gol Gappe","North Indian, Rajasthani",600,"[('Rated 4.0', 'RATED\n Very good restaurant ...",[],Buffet,Banashankari,,,,,,,
7,https://www.zomato.com/bangalore/timepass-dinn...,"37, 5-1, 4th Floor, Bosco Court, Gandhi Bazaar...",Timepass Dinner,Yes,No,3.8/5,286,+91 9980040002,,,,,,,,,
8,"+91 9980063005""",Basavanagudi,Casual Dining,"Onion Rings, Pasta, Kadhai Paneer, Salads, Sal...",North Indian,600,"[('Rated 3.0', 'RATED\n Food 3/5\nAmbience 3/...",[],Buffet,Banashankari,,,,,,,
9,https://www.zomato.com/bangalore/rosewood-inte...,"19/1, New Timberyard Layout, Beside Satellite ...",Rosewood International Hotel - Bar & Restaurant,No,No,3.6/5,8,+91 9731716688,,,,,,,,,


Usually you should specify what columns you want to fill with the subset parameter

In [148]:
df.filter(df.name.isNull()).na.fill('No Name',subset=['name']).limit(5).toPandas()

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,"+91 9986692090""",BTM,No Name,"Momos, Oreo Shake","Mughlai, North Indian, Chinese, Momos",600,"""[('Rated 5.0', """"RATED\n Ordered Chicken Kad...",('Rated 3.0','RATED\n Simple food with great north indian...,"['Fry Chicken Kabab [5 Pieces]', 'Fry Chicken ...",Delivery,Bannerghatta Road,,,,,
1,"00 805074123""",BTM,No Name,,"North Indian, Chinese, Arabian",700,"""[('Rated 2.0', 'RATED\n You would only go to...",[],Delivery,Bannerghatta Road,,,,,,,
2,"+91 8971051846""",Bannerghatta Road,No Name,,"Street Food, Burger",150,[],[],Delivery,Bannerghatta Road,,,,,,,
3,"080 39457777""",Bannerghatta Road,No Name,"Chicken Biryani, Hyderabadi Biryani, Rolls, Mu...","Biryani, North Indian",500,"""[('Rated 3.0', 'RATED\n If you a spicy biriy...",('Rated 3.0','RATED\n too much oil in rice items'),('Rated 2.0','RATED\n salan was not provided'),('Rated 1.0','RATED\n I ordered aam ras n received nimbu ...,('Rated 3.0','RATED\n ok ok biryani'),('Rated 1.0',"""""RATED\n poor test & quality... will never ..."
4,"+91 8971051846""",Bannerghatta Road,No Name,,"Street Food, Burger",150,[],[],Dine-out,Bannerghatta Road,,,,,,,


A very common practice is to fill values with the mean value for the column. Here is a fun function to that in an automatted way.

In [149]:
def fill_with_mean(df, include=set()): 
    stats = df.agg(*(
        avg(c).alias(c) for c in df.columns if c in include
    ))
#     stats = stats.select(*(col(c).cast("int").alias(c) for c in stats.columns)) #IntegerType()
    return df.na.fill(stats.first().asDict())

updated_df = fill_with_mean(df, ["approx_cost(for two people)","votes"])
updated_df.limit(5).toPandas()

Unnamed: 0,url,address,name,online_order,book_table,rate,votes,phone,location,rest_type,dish_liked,cuisines,approx_cost(for two people),reviews_list,menu_item,listed_in(type),listed_in(city)
0,https://www.zomato.com/bangalore/jalsa-banasha...,"942, 21st Main Road, 2nd Stage, Banashankari, ...",Jalsa,Yes,Yes,4.1/5,775,080 42297555,,,,,387.40837156371134,,,,
1,"+91 9743772233""",Banashankari,Casual Dining,"Pasta, Lunch Buffet, Masala Papad, Paneer Laja...","North Indian, Mughlai, Chinese",800,"""[('Rated 4.0', 'RATED\n A beautiful place to...",('Rated 4.0','RATED\n You canÃ\x83Ã\x83Ã\x82Ã\x82Ã\x...,('Rated 5.0','RATED\n Overdelighted by the service and fo...,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...,('Rated 4.0','RATED\n The place is nice and comfortable. ...
2,https://www.zomato.com/bangalore/spice-elephan...,"2nd Floor, 80 Feet Road, Near Big Bazaar, 6th ...",Spice Elephant,Yes,No,4.1/5,787,080 41714161,Banashankari,Casual Dining,"Momos, Lunch Buffet, Chocolate Nirvana, Thai G...","Chinese, North Indian, Thai",800,"""[('Rated 4.0', 'RATED\n Had been here for di...",rice was well cooked and overall was great\n\n...,('Rated 5.0','RATED\n This place just cool ? with good am...
3,https://www.zomato.com/SanchurroBangalore?cont...,"1112, Next to KIMS Medical College, 17th Cross...",San Churro Cafe,Yes,No,3.8/5,918,+91 9663487993,Banashankari,"Cafe, Casual Dining","Churros, Cannelloni, Minestrone Soup, Hot Choc...","Cafe, Mexican, Italian",800,"""[('Rated 3.0', """"RATED\n Ambience is not tha...",('Rated 3.0',"""""RATED\n \nWent there for a quick bite with ...",pasta churros and lasagne.\n\nNachos were pat...
4,https://www.zomato.com/bangalore/addhuri-udupi...,"1st Floor, Annakuteera, 3rd Stage, Banashankar...",Addhuri Udupi Bhojana,No,No,3.7/5,88,+91 9620009302,Banashankari,Quick Bites,Masala Dosa,"South Indian, North Indian",300,"""[('Rated 4.0', """"RATED\n Great food and prop...",('Rated 2.0','RATED\n Reached the place at 3pm on Saturda...,('Rated 4.0'


## Manipulating Data in DataFrames

Change data types

### Available types:
    - DataType
    - NullType
    - StringType
    - BinaryType
    - BooleanType
    - DateType
    - TimestampType
    - DecimalType
    - DoubleType
    - FloatType
    - ByteType
    - IntegerType
    - LongType
    - ShortType
    - ArrayType
    - MapType
    - StructField
    - StructType

In [152]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

df = spark.read.csv(path+'youtubevideos.csv', inferSchema=True,header=True)
print(df.printSchema())
df.limit(4).toPandas()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: string (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)

None


Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,17.14.11,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,17.14.11,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."
2,5qpjK5DgCt4,17.14.11,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...
3,puqaWrEC7tY,17.14.11,Nickelback Lyrics: Real or Fake?,Good Mythical Morning,24,2017-11-13T11:00:04.000Z,"""rhett and link""|""gmm""|""good mythical morning""...",343168,10172,666,2146,https://i.ytimg.com/vi/puqaWrEC7tY/default.jpg,False,False,False,Today we find out if Link is a Nickelback amat...


**Regex**

Regex is used to replace or extract all substrings of the specified string value that match regexp with rep.
regexp_replace(str, pattern, replacement)
for more info on regex calls visit: https://docs.oracle.com/cd/B19306_01/server.102/b14200/ap_posix001.htm#BABJDBHB

In [153]:
from pyspark.sql.functions import regexp_replace, regexp_extract
import pyspark.sql.functions as f

df = df.withColumn('publish_time',regexp_replace(df.publish_time, 'T', ' '))
df = df.withColumn('publish_time',regexp_replace(df.publish_time, 'Z', ''))
df = df.withColumn("publish_time", to_timestamp(df.publish_time, 'yyyy-MM-dd HH:mm:ss.SSS'))
print(df.printSchema())
df.limit(4).toPandas()

root
 |-- video_id: string (nullable = true)
 |-- trending_date: string (nullable = true)
 |-- title: string (nullable = true)
 |-- channel_title: string (nullable = true)
 |-- category_id: string (nullable = true)
 |-- publish_time: timestamp (nullable = true)
 |-- tags: string (nullable = true)
 |-- views: string (nullable = true)
 |-- likes: string (nullable = true)
 |-- dislikes: string (nullable = true)
 |-- comment_count: string (nullable = true)
 |-- thumbnail_link: string (nullable = true)
 |-- comments_disabled: string (nullable = true)
 |-- ratings_disabled: string (nullable = true)
 |-- video_error_or_removed: string (nullable = true)
 |-- description: string (nullable = true)

None


Unnamed: 0,video_id,trending_date,title,channel_title,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,17.14.11,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13 17:13:01,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,17.14.11,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13 07:30:00,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."
2,5qpjK5DgCt4,17.14.11,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12 19:05:24,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...
3,puqaWrEC7tY,17.14.11,Nickelback Lyrics: Real or Fake?,Good Mythical Morning,24,2017-11-13 11:00:04,"""rhett and link""|""gmm""|""good mythical morning""...",343168,10172,666,2146,https://i.ytimg.com/vi/puqaWrEC7tY/default.jpg,False,False,False,Today we find out if Link is a Nickelback amat...


**Translate Function**

In [154]:
# You can also use the translate function for cases like this 
# where you wanted to replace ('$', '#', ',') with ('X', 'Y', 'Z')
import pyspark.sql.functions as f
foobar = spark.createDataFrame([("$100,00",),("#foobar",),("foo, bar, #, and $",)], ["A"])
foobar.select("A", f.translate(f.col("A"), "$#,", "XYZ").alias("replaced")).show()

+------------------+------------------+
|                 A|          replaced|
+------------------+------------------+
|           $100,00|           X100Z00|
|           #foobar|           Yfoobar|
|foo, bar, #, and $|fooZ barZ YZ and X|
+------------------+------------------+



**Trim**

In [155]:
# Trim
# pyspark.sql.functions.trim(col) - Trim the spaces from both ends for the specified string column.
from pyspark.sql.functions import *

trim_ex = spark.createDataFrame([(' 2015-04-08 ',' 2015-05-10 ')], ['d1', 'd2']) # create a dataframe - notice the extra whitespaces in the date strings
trim_ex.show()
print("left trim")
trim_ex.select('d1', ltrim(trim_ex.d1)).show()
print("right trim")
trim_ex.select('d1', rtrim(trim_ex.d1)).show()
print("trim")
trim_ex.select('d1', trim(trim_ex.d1)).show()

+------------+------------+
|          d1|          d2|
+------------+------------+
| 2015-04-08 | 2015-05-10 |
+------------+------------+

left trim
+------------+-----------+
|          d1|  ltrim(d1)|
+------------+-----------+
| 2015-04-08 |2015-04-08 |
+------------+-----------+

right trim
+------------+-----------+
|          d1|  rtrim(d1)|
+------------+-----------+
| 2015-04-08 | 2015-04-08|
+------------+-----------+

trim
+------------+----------+
|          d1|  trim(d1)|
+------------+----------+
| 2015-04-08 |2015-04-08|
+------------+----------+



**Case When**

In [156]:
df = spark.createDataFrame([(1,1),(2,2),(3,3)],['id','value'])

print("Sample Dataframe:")
df.show()

print("Option#1: withColumn() using when-otherwise")
from pyspark.sql.functions import when
df.withColumn("value_desc",when(df.value == 1, 'one').when(df.value == 2, 'two').otherwise('other')).show()

print("Option2: withColumn() using expr function")
from pyspark.sql.functions import expr 
df.withColumn("value_desc",expr("CASE WHEN value == 1 THEN  'one' WHEN value == 2 THEN  'two' ELSE 'other' END AS value_desc")).show()

print("Option 3: selectExpr() using SQL equivalent CASE expression")
fifa.selectExpr("*","CASE WHEN value == 1 THEN  'one' WHEN value == 2 THEN  'two' ELSE 'other' END AS value_desc").show()

print("Option 4: select() using expr function")
from pyspark.sql.functions import expr 
df.select("*",expr("CASE WHEN value == 1 THEN  'one' WHEN value == 2 THEN  'two' ELSE 'other' END AS value_desc")).show()

Sample Dataframe:
+---+-----+
| id|value|
+---+-----+
|  1|    1|
|  2|    2|
|  3|    3|
+---+-----+

Option#1: withColumn() using when-otherwise
+---+-----+----------+
| id|value|value_desc|
+---+-----+----------+
|  1|    1|       one|
|  2|    2|       two|
|  3|    3|     other|
+---+-----+----------+

Option2: withColumn() using expr function
+---+-----+----------+
| id|value|value_desc|
+---+-----+----------+
|  1|    1|       one|
|  2|    2|       two|
|  3|    3|     other|
+---+-----+----------+

Option 3: selectExpr() using SQL equivalent CASE expression
+---+------+-----------------+---+--------------------+-----------+--------------------+-------+---------+-------------------+--------------------+-------+-----+-------+--------------+------------------------+---------+-----------+--------------+----------+---------+--------+-------------+------------+-----------+--------------------+------+------+----+----+----+----+----+----+----+----+----+----+----+----+----+----+----+--

+---+-----+----------+
| id|value|value_desc|
+---+-----+----------+
|  1|    1|       one|
|  2|    2|       two|
|  3|    3|     other|
+---+-----+----------+



In [157]:
# Use in between to filter values
df = spark.createDataFrame([(1,1),(2,2),(3,3)],['id','value'])
df.filter(df.value.between(1,2)).show()

+---+-----+
| id|value|
+---+-----+
|  1|    1|
|  2|    2|
+---+-----+



**Creating new columns calculated using existing columns**

In [159]:
# Add a new column from an existing column like this....
# withColumn(colName, col)[source]
# Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
# The column expression must be an expression over this DataFrame; attempting to add a column from some other dataframe will raise an error.

# Parameters
# colName – string, name of the new column.

# col – a Column expression for the new column.
df = spark.read.csv(path+'youtubevideos.csv', inferSchema=True,header=True)
views = df.withColumn('views_x_2', df.views * 2)
views.select(['views','views_x_2']).show(4)

+-------+---------+
|  views|views_x_2|
+-------+---------+
| 748374|1496748.0|
|2418783|4837566.0|
|3191434|6382868.0|
| 343168| 686336.0|
+-------+---------+
only showing top 4 rows



In [160]:
# You can also use this method to overwrite a column
views = views.withColumn('views', views.views * 2)
views.select(['views','views_x_2']).show(4)

+---------+---------+
|    views|views_x_2|
+---------+---------+
|1496748.0|1496748.0|
|4837566.0|4837566.0|
|6382868.0|6382868.0|
| 686336.0| 686336.0|
+---------+---------+
only showing top 4 rows



**Renaming Columns**

In [161]:
# Simple Rename
renamed = df.withColumnRenamed('channel_title','channel_title_new')
renamed.limit(4).toPandas()

Unnamed: 0,video_id,trending_date,title,channel_title_new,category_id,publish_time,tags,views,likes,dislikes,comment_count,thumbnail_link,comments_disabled,ratings_disabled,video_error_or_removed,description
0,2kyS6SvSYSE,17.14.11,WE WANT TO TALK ABOUT OUR MARRIAGE,CaseyNeistat,22,2017-11-13T17:13:01.000Z,SHANtell martin,748374,57527,2966,15954,https://i.ytimg.com/vi/2kyS6SvSYSE/default.jpg,False,False,False,SHANTELL'S CHANNEL - https://www.youtube.com/s...
1,1ZAPwfrtAFY,17.14.11,The Trump Presidency: Last Week Tonight with J...,LastWeekTonight,24,2017-11-13T07:30:00.000Z,"""last week tonight trump presidency""|""last wee...",2418783,97185,6146,12703,https://i.ytimg.com/vi/1ZAPwfrtAFY/default.jpg,False,False,False,"One year after the presidential election, John..."
2,5qpjK5DgCt4,17.14.11,"Racist Superman | Rudy Mancuso, King Bach & Le...",Rudy Mancuso,23,2017-11-12T19:05:24.000Z,"""racist superman""|""rudy""|""mancuso""|""king""|""bac...",3191434,146033,5339,8181,https://i.ytimg.com/vi/5qpjK5DgCt4/default.jpg,False,False,False,WATCH MY PREVIOUS VIDEO ▶ \n\nSUBSCRIBE ► http...
3,puqaWrEC7tY,17.14.11,Nickelback Lyrics: Real or Fake?,Good Mythical Morning,24,2017-11-13T11:00:04.000Z,"""rhett and link""|""gmm""|""good mythical morning""...",343168,10172,666,2146,https://i.ytimg.com/vi/puqaWrEC7tY/default.jpg,False,False,False,Today we find out if Link is a Nickelback amat...


**Concatenate**

In [162]:
from pyspark.sql.types import * #IntegerType

# Concatenate columns
# pyspark.sql.functions.concat_ws(sep, *cols)[source]
# Concatenates multiple input string columns together into a single string column, using the given separator.

names = spark.createDataFrame([('Abraham','Lincoln')], ['first_name', 'last_name'])
names.select(names.first_name,names.last_name,concat_ws(' ', names.first_name, names.last_name).alias('full_name')).show()

+----------+---------+---------------+
|first_name|last_name|      full_name|
+----------+---------+---------------+
|   Abraham|  Lincoln|Abraham Lincoln|
+----------+---------+---------------+



**Extracting from Date and Timestamp variables**

In [166]:
# Extract year, month, day etc. from a date field
# Other options: dayofmonth, dayofweek, dayofyear, weekofyear
import pyspark.sql.functions as fn
year = df.withColumn("TRENDING_YEAR",fn.year("trending_date")) \
         .withColumn("TRENDING_MONTH",fn.month("trending_date"))
#QA
year.filter("TRENDING_YEAR=2011").select(['trending_date','TRENDING_YEAR','TRENDING_MONTH']).show()


+-------------+-------------+--------------+
|trending_date|TRENDING_YEAR|TRENDING_MONTH|
+-------------+-------------+--------------+
+-------------+-------------+--------------+



In [167]:
# Calculate the difference between two dates:
# pyspark.sql.functions.datediff(end, start)
# Returns the number of days from start to end.

date_df = spark.createDataFrame([('2015-04-08','2015-05-10')], ['d1', 'd2'])
date_df.select(datediff(date_df.d2, date_df.d1).alias('diff')).show()

+----+
|diff|
+----+
|  32|
+----+



**Splitting a string around a pattern**

In [168]:
# Split a string around pattern (pattern is a regular expression).
from pyspark.sql.functions import *
# pyspark.sql.functions.split(str, pattern)[source]

abc = spark.createDataFrame([('ab12cd',)], ['s',])
abc.select(abc.s,split(abc.s, '[0-9]+').alias('news')).show()

+------+--------+
|     s|    news|
+------+--------+
|ab12cd|[ab, cd]|
+------+--------+



**Arrays**

*Note that the array_distinct feature is new in Spark 2.4. 

In [169]:
# Arrays - col/cols – list of column names (string) or list of Column expressions that have the same data type.
# pyspark.sql.functions
# note this is only available in pyspark 2.4+
from pyspark.sql.functions import *
#      .array(*cols)   -   Creates a new array column.
#      .array_contains(col, value)  - Collection function: returns null if the array is null, true if the array contains the given value, and false otherwise.
#      .array_distinct(col) - Collection function: removes duplicate values from the array. :param col: name of column or expression
#      .array_except(col1, col2) - Collection function: returns an array of the elements in col1 but not in col2, without duplicates.
#      .array_intersect(col1, col2) - Collection function: returns an array of the elements in the intersection of col1 and col2, without duplicates.
#      .array_join(col, delimiter, null_replacement=None) - Concatenates the elements of column using the delimiter. Null values are replaced with null_replacement if set, otherwise they are ignored.
#      .array_max(col) - Collection function: returns the maximum value of the array.
#      .array_min(col) - Collection function: returns the minimum value of the array.
#      .array_position(col, value) - Collection function: Locates the position of the first occurrence of the given value in the given array. Returns null if either of the arguments are null.
#      .array_remove(col, element)- Collection function: Remove all elements that equal to element from the given array.
#      .array_repeat(col, count) - Collection function: creates an array containing a column repeated count times.
#      .array_sort(col) - Collection function: sorts the input array in ascending order. The elements of the input array must be orderable. Null elements will be placed at the end of the returned array.
#      .array_union(col1, col2) - Collection function: returns an array of the elements in the union of col1 and col2, without duplicates.
#      .arrays_overlap(a1, a2) - Collection function: returns true if the arrays contain any common non-null element; if not, returns null if both the arrays are non-empty and any of them contains a null element; returns false otherwise.
#      .arrays_zip(*cols)[source] - Collection function: Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.

customer = spark.createDataFrame([('coffee','milk','coffee','coffee','chocolate','')], ['item1', 'item2','item3','item4','item5','item6'])
purchases = customer.select(array('item1', 'item2','item3').alias("Monday"),\
                            array('item4', 'item5','item6').alias("Tuesday"))

print("array")
purchases.show()

print("Which customers purchased milk? array_contains")
purchases.select(array_contains(purchases.Monday, "milk")).show(1, False)

print("List of unique products purchased on Monday: array_distinct")
purchases.select(array_distinct(purchases.Monday)).show(1, False)

print("What did our customers order on Monday but not Tuesday? array_except")
purchases.select(array_except(purchases.Monday, purchases.Tuesday)).show(1, False)

print("What did our customers order on BOTH Monday and Tuesday?: array_intersect")
purchases.select(array_intersect(purchases.Monday, purchases.Tuesday)).show(1, False)

print("All purchases on monday in a string: array_join")
purchases.select(array_join(purchases.Monday, ',')).show(1, False)

array
+--------------------+--------------------+
|              Monday|             Tuesday|
+--------------------+--------------------+
|[coffee, milk, co...|[coffee, chocolat...|
+--------------------+--------------------+

Which customers purchased milk? array_contains
+----------------------------+
|array_contains(Monday, milk)|
+----------------------------+
|true                        |
+----------------------------+

List of unique products purchased on Monday: array_distinct
+----------------------+
|array_distinct(Monday)|
+----------------------+
|[coffee, milk]        |
+----------------------+

What did our customers order on Monday but not Tuesday? array_except
+-----------------------------+
|array_except(Monday, Tuesday)|
+-----------------------------+
|[milk]                       |
+-----------------------------+

What did our customers order on BOTH Monday and Tuesday?: array_intersect
+--------------------------------+
|array_intersect(Monday, Tuesday)|
+---------

**Create an array by splitting a string field** 

In [170]:
from pyspark.sql.functions import *
values = [(45,'I like to ride bikes'), \
          (14,'I like chicken'), \
          (63,'I like bubbles'), \
          (75,'I like roller coasters'), \
          (24,'I like shuffle board'), \
          (45,'I like to swim')]
sentences = spark.createDataFrame(values,['age', 'sentence'])
df = sentences.withColumn("array", split(col("sentence"), " "))
df.show(1,False)

+---+--------------------+--------------------------+
|age|sentence            |array                     |
+---+--------------------+--------------------------+
|45 |I like to ride bikes|[I, like, to, ride, bikes]|
+---+--------------------+--------------------------+
only showing top 1 row



## Creating Functions

Functions as you know them in Python work a bit differently in Pyspark because it operates on a cluster. If you define a function the traditional Python way in PySpark, you will not recieve an error message but the call will not distribute on all nodes. So it will run slower. 

So to convert a Python function to what's called a user defined function (UDF) in PySpark. This is what you do.

In [171]:
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

def square(x):
    return int(x**2)
square_udf = udf(lambda z: square(z), IntegerType())

df.select('age',square_udf('age').alias('age_squared')).show()

+---+-----------+
|age|age_squared|
+---+-----------+
| 45|       2025|
| 14|        196|
| 63|       3969|
| 75|       5625|
| 24|        576|
| 45|       2025|
+---+-----------+

