#Preparation

Install Pyspark

In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 45 kB/s 
[?25hCollecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[K     |████████████████████████████████| 199 kB 69.5 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=deca89621acb20f879975c48535f1ca5b3d2319e2ad3f4dbdd0b1dec0709edf6
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.5 pyspark-3.3.1


Import Pyspark Package

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

Upload File

In [None]:
from google.colab import files
uploaded = files.upload()

Saving Dataset.zip to Dataset.zip


In [None]:
!unzip Dataset.zip

Archive:  Dataset.zip
   creating: Dataset/
 extracting: Dataset/cities1.csv     
  inflating: Dataset/cities1_schema.csv  
 extracting: Dataset/cities2.csv     
   creating: Dataset/exercise/
  inflating: Dataset/exercise/AkunTwitter_POS.csv  
  inflating: Dataset/exercise/HashtagTwitter_POS.csv  
  inflating: Dataset/exercise/Instagram_POS.json  
   creating: Dataset/json/
 extracting: Dataset/json/user1.json  
 extracting: Dataset/json/user2.json  
 extracting: Dataset/pcodes.csv      
  inflating: Dataset/people-no-pcode.csv  
  inflating: Dataset/purplecow.txt   
 extracting: Dataset/zcodes.csv      


Initiate Spark

In [None]:
spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("test") \
    .getOrCreate()

In [None]:
spark

# Read Data From File Source

Read Data

In [None]:
myDF = spark.read.format("csv").load("Dataset/cities1_schema.csv")

View Data

In [None]:
myDF.show()
myDF.printSchema()
myDF.dtypes

+---------+-----+----------+
|      _c0|  _c1|       _c2|
+---------+-----+----------+
|     City|State|Population|
|   Boston|   MA|      1000|
|Palo Alto|   CA|      2000|
| Santa Fe|   NM|      3000|
|Palo Alto|   CA|      4000|
+---------+-----+----------+

root
 |-- _c0: string (nullable = true)
 |-- _c1: string (nullable = true)
 |-- _c2: string (nullable = true)



[('_c0', 'string'), ('_c1', 'string'), ('_c2', 'string')]

In [None]:
myDF = spark.read.format("csv"). \
option("inferSchema","true"). \
option("header","true"). \
load("Dataset/cities1_schema.csv")

In [None]:
myDF.show()
myDF.printSchema()
myDF.dtypes

+---------+-----+----------+
|     City|State|Population|
+---------+-----+----------+
|   Boston|   MA|      1000|
|Palo Alto|   CA|      2000|
| Santa Fe|   NM|      3000|
|Palo Alto|   CA|      4000|
+---------+-----+----------+

root
 |-- City: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Population: integer (nullable = true)



[('City', 'string'), ('State', 'string'), ('Population', 'int')]

In [None]:
myDF.head()

Row(City='Boston', State='MA', Population=1000)

In [None]:
myDF.show()

+---------+-----+----------+
|     City|State|Population|
+---------+-----+----------+
|   Boston|   MA|      1000|
|Palo Alto|   CA|      2000|
| Santa Fe|   NM|      3000|
|Palo Alto|   CA|      4000|
+---------+-----+----------+



Read from json

In [None]:
myDF = spark.read.format("json").load("Dataset/json/user1.json")

In [None]:
myDF.show()
myDF.printSchema()

+---------+----------+------+
|firstName|  lastName|userid|
+---------+----------+------+
|     Fred|Flintstone|   123|
+---------+----------+------+

root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- userid: string (nullable = true)



# Create and Write DataFrame

Create DataFrame

In [None]:
# mydata = [["Josiah","Bartlett",33],["Harry","Potter",20]]

mydata = [
          {
              "firstName": "Josiah",
              "lastName": "Bartlett",
              "age": 33
          },
          {
              "firstName": "Harry",
              "lastName": "Potter",
              "age": 20
          },
          {
              "firstName": "Josiah",
              "lastName": "Bartlett",
              "age": 33
          },
          {
              "firstName": "Harry",
              "lastName": "Potter",
              "age": 20
          },
          {
              "firstName": "Josiah",
              "lastName": "Bartlett",
              "age": 33
          },
          {
              "firstName": "Harry",
              "lastName": "Potter",
              "age": 25
          },
          {
              "firstName": "Josiah",
              "lastName": "Bartlett",
              "age": 33
          },
          {
              "firstName": "Harry",
              "lastName": "Potter",
              "age": 25
          },
  ]

myDF = spark.createDataFrame(mydata)
myDF.printSchema()

root
 |-- age: long (nullable = true)
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)



In [None]:
myDF.show(5)

+---+---------+--------+
|age|firstName|lastName|
+---+---------+--------+
| 33|   Josiah|Bartlett|
| 20|    Harry|  Potter|
| 33|   Josiah|Bartlett|
| 20|    Harry|  Potter|
| 33|   Josiah|Bartlett|
+---+---------+--------+
only showing top 5 rows



In [None]:
columnsList = [
  StructField("firstName", StringType()),
  StructField("lastName", StringType()),
  StructField("age", IntegerType())]

schema = StructType(columnsList)

In [None]:
myDF = spark.createDataFrame(mydata, schema)
myDF.printSchema()
myDF.show()

root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- age: integer (nullable = true)

+---------+--------+---+
|firstName|lastName|age|
+---------+--------+---+
|   Josiah|Bartlett| 33|
|    Harry|  Potter| 20|
|   Josiah|Bartlett| 33|
|    Harry|  Potter| 20|
|   Josiah|Bartlett| 33|
|    Harry|  Potter| 25|
|   Josiah|Bartlett| 33|
|    Harry|  Potter| 25|
+---------+--------+---+



In [None]:
# write dataframe with partition
# save data using parquet, compress, & partition
myDF.write.mode('overwrite').partitionBy('age')\
        .option('compression', 'snappy')\
        .option("partitionOverwriteMode", "dynamic")\
        .save('data_result')

In [None]:
%%time
# read data parquet
dataDFParquet = spark.read.parquet('data_result')
dataDFParquet.createOrReplaceTempView("data_result")

CPU times: user 3.16 ms, sys: 1.06 ms, total: 4.22 ms
Wall time: 626 ms


Write DataFrame

In [None]:
# myDF.write.json("mydata_fix2")

In [None]:
# myDF.write.mode("append").json("mydata_fix2")

In [None]:
# myDF.write.mode("overwrite").json("mydata_fix2")

# Working with Different Types of Data

In [None]:
myDF.show()

+---------+--------+---+
|firstName|lastName|age|
+---------+--------+---+
|   Josiah|Bartlett| 33|
|    Harry|  Potter| 20|
+---------+--------+---+



Boolean

In [None]:
# select lastName, age from myDF
myDF.select("lastName", "age").show()

+--------+---+
|lastName|age|
+--------+---+
|Bartlett| 33|
|  Potter| 20|
+--------+---+



In [None]:
# select * from myDF where lastName = 'Bartlett'
myDF.where(myDF["lastName"] == "Bartlett").show()

+---------+--------+---+
|firstName|lastName|age|
+---------+--------+---+
|   Josiah|Bartlett| 33|
+---------+--------+---+



In [None]:
# select * from myDF where lastName != 'Bartlett'
myDF.where(myDF["lastName"] != "Bartlett").show()

+---------+--------+---+
|firstName|lastName|age|
+---------+--------+---+
|    Harry|  Potter| 20|
+---------+--------+---+



In [None]:
myDF.select('lastName', 'age').where(myDF['age'] > 20).show()

+--------+---+
|lastName|age|
+--------+---+
|Bartlett| 33|
+--------+---+



In [None]:
result_1 = myDF.select('lastName', 'age').where(myDF['age'] > 20)

In [None]:
result_2 = myDF.select('lastName', 'age')
result_2 = result_2.where(result_2['age'] > 20)

In [None]:
result_2.show()

+--------+---+
|lastName|age|
+--------+---+
|Bartlett| 33|
+--------+---+



In [None]:
result_1.show()

+--------+---+
|lastName|age|
+--------+---+
|Bartlett| 33|
+--------+---+



Working With Number

In [None]:
# select * from myDF where age > 20
myDF.where(myDF["age"] > 20).show()

+---------+--------+---+
|firstName|lastName|age|
+---------+--------+---+
|   Josiah|Bartlett| 33|
+---------+--------+---+



In [None]:
myDF.printSchema()

root
 |-- firstName: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- age: integer (nullable = true)



In [None]:
# select lastName as "Last Name", age*10 as "age * 10" from myDF
myDF.select(myDF["lastName"].alias("Last Name"), (col("age")*10).alias('age * 10')).show()

+---------+--------+
|Last Name|age * 10|
+---------+--------+
| Bartlett|     330|
|   Potter|     200|
+---------+--------+



In [None]:
# select *, age*10 as age_1 from myDF
myDF = myDF.withColumn("age_1", col("age")*10)
myDF.show()

+---------+--------+---+-----+
|firstName|lastName|age|age_1|
+---------+--------+---+-----+
|   Josiah|Bartlett| 33|  330|
|    Harry|  Potter| 20|  200|
+---------+--------+---+-----+



Working With String

In [None]:
# select myDF.*, case when age > 25 then adult else kid end as age_group from myDF as myDF
myDF = myDF.withColumn("age_group", when(col("age") > 25, lit("Adult")).otherwise(lit("Kid")))
myDF.show()

+---------+--------+---+-----+---------+
|firstName|lastName|age|age_1|age_group|
+---------+--------+---+-----+---------+
|   Josiah|Bartlett| 33|  330|    Adult|
|    Harry|  Potter| 20|  200|      Kid|
+---------+--------+---+-----+---------+



In [None]:
# select myDF.*, concat(firstName, " ", lastName) as fullName from myDF as myDF
myDF = myDF.withColumn("fullName", concat(col("firstName"), lit(" "), col("lastName")))
myDF.show()

+---------+--------+---+-----+---------+---------------+
|firstName|lastName|age|age_1|age_group|       fullName|
+---------+--------+---+-----+---------+---------------+
|   Josiah|Bartlett| 33|  330|    Adult|Josiah Bartlett|
|    Harry|  Potter| 20|  200|      Kid|   Harry Potter|
+---------+--------+---+-----+---------+---------------+



In [None]:
myDF.select(upper(col("fullName")).alias("fullName")).show()

+---------------+
|       fullName|
+---------------+
|JOSIAH BARTLETT|
|   HARRY POTTER|
+---------------+



In [None]:
myDF.select(lower(col("fullName")).alias("fullName")).show()

+---------------+
|       fullName|
+---------------+
|josiah bartlett|
|   harry potter|
+---------------+



Complex Data

In [None]:
instaDF = spark.read.format("json").load("Dataset/exercise/Instagram_POS.json")

In [None]:
instaDF.show()
instaDF.printSchema()

+--------------------+--------------------+--------------------+--------------------+--------------------+
|             caption|            comments|            datetime|            img_urls|                 key|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Oranger menjaga s...|[{mamah.asyraf, C...|2019-07-13T04:06:...|[https://scontent...|https://www.insta...|
|Oranger, layanan ...|[{rizqimuhammad77...|2019-07-12T09:55:...|[https://scontent...|https://www.insta...|
|Selamat hari kope...|[{evan_mardiyanto...|2019-07-12T02:21:...|[https://scontent...|https://www.insta...|
|Kuliah? di @polte...|[{dicky.ra_, @pos...|2019-07-11T05:19:...|[https://scontent...|https://www.insta...|
|Selamat ulang tah...|[{hey.rudd, Kak.....|2019-07-11T01:22:...|[https://scontent...|https://www.insta...|
|Kiriman lewat Pos...|[{rienlilitanty, ...|2019-07-10T10:46:...|[https://scontent...|https://www.insta...|
|Sedang mencari ka...|[{nurdianing, m

In [None]:
instaDF.select('comments').show(2)

+--------------------+
|            comments|
+--------------------+
|[{mamah.asyraf, C...|
|[{rizqimuhammad77...|
+--------------------+
only showing top 2 rows



In [None]:
instaDF.select('comments.author').show(2)

+--------------------+
|              author|
+--------------------+
|[mamah.asyraf, po...|
|[rizqimuhammad77,...|
+--------------------+
only showing top 2 rows



In [None]:
instaDF.select('comments.comment').show(2)

+--------------------+
|             comment|
+--------------------+
|[Cek DM, Baik, Sa...|
|[@ari_hr04 @adi04...|
+--------------------+
only showing top 2 rows



In [None]:
def FlatDF(schema, prefix=None):
        fields = []
        for field in schema.fields:
            name = prefix + '.' + field.name if prefix else field.name
            dtype = field.dataType
            if isinstance(dtype, ArrayType):
                dtype = dtype.elementType

            if isinstance(dtype, StructType):
                fields += FlatDF(dtype, prefix=name)
            else:
                fields.append(name)

        return fields

In [None]:
instaDF = instaDF.select(FlatDF(instaDF.schema))

In [None]:
instaDF.show()
instaDF.printSchema()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             caption|              author|             comment|            datetime|            img_urls|                 key|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Oranger menjaga s...|[mamah.asyraf, po...|[Cek DM, Baik, Sa...|2019-07-13T04:06:...|[https://scontent...|https://www.insta...|
|Oranger, layanan ...|[rizqimuhammad77,...|[@ari_hr04 @adi04...|2019-07-12T09:55:...|[https://scontent...|https://www.insta...|
|Selamat hari kope...|[evan_mardiyanto,...|[Min mohon segera...|2019-07-12T02:21:...|[https://scontent...|https://www.insta...|
|Kuliah? di @polte...|[dicky.ra_, posin...|[@posindonesia.ig...|2019-07-11T05:19:...|[https://scontent...|https://www.insta...|
|Selamat ulang tah...|[hey.rudd, jovian...|[Kak.. mau tanya....|2019-07-11T01:22:...|[https://scontent..

In [None]:
instaDF.select(col("author")[1].alias('author-1'), col("comment")[1].alias('comment-1')).show()

+-------------------+--------------------+
|           author-1|           comment-1|
+-------------------+--------------------+
|    posindonesia.ig|Baik, Sahabat, mo...|
|griyakulakannganjuk|Kirim paket belum...|
|       ojombokfolou|SANGAT KECEWA. sa...|
|    posindonesia.ig|Hai sahabat, kiri...|
|   jovian_aryodhito|@mamah.asyraf trs...|
|    posindonesia.ig|Hai sahabat, kiri...|
|    posindonesia.ig|Halo Sahabat pos....|
|    posindonesia.ig|Layanan tersebut ...|
|          leylyeyle|@posindonesia.ig ...|
|      irenerufianti|Min.. kirim parce...|
|               null|                null|
|               null|                null|
|    posindonesia.ig|Baik Sahabat, jik...|
|    posindonesia.ig|Sama-sama Sahabat...|
|             xxyn99|@syamsul_manalu23...|
|        nyomandante|Semoga mendapat t...|
|     intanardianty_|Tingkatkan pelaya...|
|        nana_munzir|Ka. Kenapa ya pak...|
|    posindonesia.ig|Mohon maaf atas k...|
|    posindonesia.ig|Halo sahabat pos....|
+----------

Working with Date

In [None]:
instaDF.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|             caption|              author|             comment|            datetime|            img_urls|                 key|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Oranger menjaga s...|[mamah.asyraf, po...|[Cek DM, Baik, Sa...|2019-07-13T04:06:...|[https://scontent...|https://www.insta...|
|Oranger, layanan ...|[rizqimuhammad77,...|[@ari_hr04 @adi04...|2019-07-12T09:55:...|[https://scontent...|https://www.insta...|
|Selamat hari kope...|[evan_mardiyanto,...|[Min mohon segera...|2019-07-12T02:21:...|[https://scontent...|https://www.insta...|
|Kuliah? di @polte...|[dicky.ra_, posin...|[@posindonesia.ig...|2019-07-11T05:19:...|[https://scontent...|https://www.insta...|
|Selamat ulang tah...|[hey.rudd, jovian...|[Kak.. mau tanya....|2019-07-11T01:22:...|[https://scontent..

In [None]:
instaDF = instaDF.withColumn("timestamp", to_timestamp("datetime"))

In [None]:
instaDF = instaDF.withColumn("date", to_date("datetime"))

In [None]:
instaDF.show()

+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+----------+
|             caption|              author|             comment|            datetime|            img_urls|                 key|          timestamp|      date|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+-------------------+----------+
|Oranger menjaga s...|[mamah.asyraf, po...|[Cek DM, Baik, Sa...|2019-07-13T04:06:...|[https://scontent...|https://www.insta...|2019-07-13 04:06:50|2019-07-13|
|Oranger, layanan ...|[rizqimuhammad77,...|[@ari_hr04 @adi04...|2019-07-12T09:55:...|[https://scontent...|https://www.insta...|2019-07-12 09:55:01|2019-07-12|
|Selamat hari kope...|[evan_mardiyanto,...|[Min mohon segera...|2019-07-12T02:21:...|[https://scontent...|https://www.insta...|2019-07-12 02:21:13|2019-07-12|
|Kuliah? di @polte...|[dicky.ra_, posin...|[@p

In [None]:
instaDF.printSchema()

root
 |-- caption: string (nullable = true)
 |-- author: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- comment: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- datetime: string (nullable = true)
 |-- img_urls: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- key: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- date: date (nullable = true)



In [None]:
instaDF.select(date_sub("date", 5), date_add("datetime", 5)).show(5)

+-----------------+---------------------+
|date_sub(date, 5)|date_add(datetime, 5)|
+-----------------+---------------------+
|       2019-07-08|           2019-07-18|
|       2019-07-07|           2019-07-17|
|       2019-07-07|           2019-07-17|
|       2019-07-06|           2019-07-16|
|       2019-07-06|           2019-07-16|
+-----------------+---------------------+
only showing top 5 rows



Working with Null Data

In [None]:
peopleDF = spark.read.option("header","true").csv("Dataset/people-no-pcode.csv")
pcodesDF = spark.read.option("header","true").csv("Dataset/pcodes.csv")

In [None]:
peopleDF.show()
peopleDF.printSchema()
pcodesDF.show()
pcodesDF.printSchema()

+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|02134|  Hopper|    Grace| 52|
| null|  Turing|     Alan| 32|
|94020|Lovelace|      Ada| 28|
|87501| Babbage|  Charles| 49|
|02134|   Wirth|  Niklaus| 48|
+-----+--------+---------+---+

root
 |-- pcode: string (nullable = true)
 |-- lastName: string (nullable = true)
 |-- firstName: string (nullable = true)
 |-- age: string (nullable = true)

+-----+---------+-----+
|pcode|     city|state|
+-----+---------+-----+
|02134|   Boston|   MA|
|94020|Palo Alto|   CA|
|87501| Santa Fe|   NM|
|60645|  Chicago|   IL|
+-----+---------+-----+

root
 |-- pcode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)



In [None]:
peopleDF.select(coalesce(col("pcode"), col("lastName"))).show()

+-------------------------+
|coalesce(pcode, lastName)|
+-------------------------+
|                    02134|
|                   Turing|
|                    94020|
|                    87501|
|                    02134|
+-------------------------+



In [None]:
peopleDF.na.drop("any").show()

+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|02134|  Hopper|    Grace| 52|
|94020|Lovelace|      Ada| 28|
|87501| Babbage|  Charles| 49|
|02134|   Wirth|  Niklaus| 48|
+-----+--------+---------+---+



In [None]:
peopleDF.na.fill("00000").show()

+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|02134|  Hopper|    Grace| 52|
|00000|  Turing|     Alan| 32|
|94020|Lovelace|      Ada| 28|
|87501| Babbage|  Charles| 49|
|02134|   Wirth|  Niklaus| 48|
+-----+--------+---------+---+



# Aggregations

Aggregate

In [None]:
peopleDF.show(5)

+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|02134|  Hopper|    Grace| 52|
| null|  Turing|     Alan| 32|
|94020|Lovelace|      Ada| 28|
|87501| Babbage|  Charles| 49|
|02134|   Wirth|  Niklaus| 48|
+-----+--------+---------+---+



In [None]:
# select count(*) from peopleDF
peopleDF.select(count("*")).show()

+--------+
|count(1)|
+--------+
|       5|
+--------+



In [None]:
# select count(distinct(pdcode)) from peopleDF
peopleDF.select(countDistinct("pcode")).show()

+---------------------+
|count(DISTINCT pcode)|
+---------------------+
|                    3|
+---------------------+



In [None]:
peopleDF.select(approxCountDistinct("pcode", 0.1)).show()



+----------------------------+
|approx_count_distinct(pcode)|
+----------------------------+
|                           3|
+----------------------------+



In [None]:
peopleDF.select(first("age"), last("age")).show()

+----------+---------+
|first(age)|last(age)|
+----------+---------+
|        52|       48|
+----------+---------+



In [None]:
peopleDF.select(min("age"), max("age")).show()

+--------+--------+
|min(age)|max(age)|
+--------+--------+
|      28|      52|
+--------+--------+



In [None]:
peopleDF.select(sum("age")).show()

+--------+
|sum(age)|
+--------+
|   209.0|
+--------+



In [None]:
peopleDF.select(avg("age")).show()

+--------+
|avg(age)|
+--------+
|    41.8|
+--------+



In [None]:
peopleDF.select(stddev("age")).show()

+------------------+
|  stddev_samp(age)|
+------------------+
|10.963576058932595|
+------------------+



In [None]:
peopleDF.select(
    min("age").alias("min"),
    max("age").alias("max"),
    sum("age").alias("sum"),
    avg("age").alias("avg")
    ).show()

+---+---+-----+----+
|min|max|  sum| avg|
+---+---+-----+----+
| 28| 52|209.0|41.8|
+---+---+-----+----+



Group By

In [None]:
peopleDF.show()

+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|02134|  Hopper|    Grace| 52|
| null|  Turing|     Alan| 32|
|94020|Lovelace|      Ada| 28|
|87501| Babbage|  Charles| 49|
|02134|   Wirth|  Niklaus| 48|
+-----+--------+---------+---+



In [None]:
# select pcode, count(*) from peopleDF group by pcode
peopleDF.groupBy("pcode").count().show()

+-----+-----+
|pcode|count|
+-----+-----+
| null|    1|
|87501|    1|
|94020|    1|
|02134|    2|
+-----+-----+



In [None]:
peopleDF.groupBy("pcode").agg(
    min("age").alias("min_age"),
    max("age").alias("max_age"),
    count("age").alias("count_age"),
    avg("age").alias("avg_age"), 
).show()

+-----+-------+-------+---------+-------+
|pcode|min_age|max_age|count_age|avg_age|
+-----+-------+-------+---------+-------+
| null|     32|     32|        1|   32.0|
|02134|     48|     52|        2|   50.0|
|87501|     49|     49|        1|   49.0|
|94020|     28|     28|        1|   28.0|
+-----+-------+-------+---------+-------+



# Joins

In [None]:
# back to data source
peopleDF.show()
pcodesDF.show()

+-----+--------+---------+---+
|pcode|lastName|firstName|age|
+-----+--------+---------+---+
|02134|  Hopper|    Grace| 52|
| null|  Turing|     Alan| 32|
|94020|Lovelace|      Ada| 28|
|87501| Babbage|  Charles| 49|
|02134|   Wirth|  Niklaus| 48|
+-----+--------+---------+---+

+-----+---------+-----+
|pcode|     city|state|
+-----+---------+-----+
|02134|   Boston|   MA|
|94020|Palo Alto|   CA|
|87501| Santa Fe|   NM|
|60645|  Chicago|   IL|
+-----+---------+-----+



In [None]:
# inner join
result = peopleDF.join(pcodesDF, "pcode")
result = result.select("lastName", "firstName", "age", "city", "state")
result.show()

+--------+---------+---+---------+-----+
|lastName|firstName|age|     city|state|
+--------+---------+---+---------+-----+
|  Hopper|    Grace| 52|   Boston|   MA|
|Lovelace|      Ada| 28|Palo Alto|   CA|
| Babbage|  Charles| 49| Santa Fe|   NM|
|   Wirth|  Niklaus| 48|   Boston|   MA|
+--------+---------+---+---------+-----+



In [None]:
peopleDF.join(pcodesDF, "pcode", "left_outer").where(col("city").isNull()).show()

+-----+--------+---------+---+----+-----+
|pcode|lastName|firstName|age|city|state|
+-----+--------+---------+---+----+-----+
| null|  Turing|     Alan| 32|null| null|
+-----+--------+---------+---+----+-----+



In [None]:
peopleDF.join(pcodesDF, "pcode", "right_outer").where(col("city").isNotNull()).show()

+-----+--------+---------+----+---------+-----+
|pcode|lastName|firstName| age|     city|state|
+-----+--------+---------+----+---------+-----+
|02134|   Wirth|  Niklaus|  48|   Boston|   MA|
|02134|  Hopper|    Grace|  52|   Boston|   MA|
|94020|Lovelace|      Ada|  28|Palo Alto|   CA|
|87501| Babbage|  Charles|  49| Santa Fe|   NM|
|60645|    null|     null|null|  Chicago|   IL|
+-----+--------+---------+----+---------+-----+



In [None]:
zcodesDF = spark.read.option("header","true").csv("Dataset/zcodes.csv")
zcodesDF.show()

+-----+---------+-----+
|  zip|     city|state|
+-----+---------+-----+
|02134|   Boston|   MA|
|94020|Palo Alto|   CA|
|87501| Santa Fe|   NM|
|60645|  Chicago|   IL|
+-----+---------+-----+



In [None]:
peopleDF.join(zcodesDF, peopleDF["pcode"] == zcodesDF["zip"]).show()

+-----+--------+---------+---+-----+---------+-----+
|pcode|lastName|firstName|age|  zip|     city|state|
+-----+--------+---------+---+-----+---------+-----+
|02134|  Hopper|    Grace| 52|02134|   Boston|   MA|
|94020|Lovelace|      Ada| 28|94020|Palo Alto|   CA|
|87501| Babbage|  Charles| 49|87501| Santa Fe|   NM|
|02134|   Wirth|  Niklaus| 48|02134|   Boston|   MA|
+-----+--------+---------+---+-----+---------+-----+



In [None]:
# select left.*, right.city from pcodesDF as left join zcodesDF as right on left.pcode=right.zip
pcodesDF.alias("left").join(zcodesDF.alias("right"), pcodesDF["pcode"] == zcodesDF["zip"]).select("left.*", "right.city").show()

+-----+---------+-----+---------+
|pcode|     city|state|     city|
+-----+---------+-----+---------+
|02134|   Boston|   MA|   Boston|
|94020|Palo Alto|   CA|Palo Alto|
|87501| Santa Fe|   NM| Santa Fe|
|60645|  Chicago|   IL|  Chicago|
+-----+---------+-----+---------+



# Spark SQL

In [None]:
myDF = spark.read.format("csv"). \
option("inferSchema","true"). \
option("header","true"). \
load("Dataset/cities1_schema.csv")

In [None]:
myDF.show()

+---------+-----+----------+
|     City|State|Population|
+---------+-----+----------+
|   Boston|   MA|      1000|
|Palo Alto|   CA|      2000|
| Santa Fe|   NM|      3000|
|Palo Alto|   CA|      4000|
+---------+-----+----------+



In [None]:
myDF.createOrReplaceTempView("cities")

In [None]:
pcodesDF.show()

+-----+---------+-----+
|pcode|     city|state|
+-----+---------+-----+
|02134|   Boston|   MA|
|94020|Palo Alto|   CA|
|87501| Santa Fe|   NM|
|60645|  Chicago|   IL|
+-----+---------+-----+



In [None]:
pcodesDF.createOrReplaceTempView("pcode")

In [None]:
# SQL can be run over DataFrames that have been registered as a table.
test_sql = spark.sql('''
  SELECT * 
    FROM cities as c join pcode as p ON p.city=c.City
''')

In [None]:
test_sql.show()

+---------+-----+----------+-----+---------+-----+
|     City|State|Population|pcode|     city|state|
+---------+-----+----------+-----+---------+-----+
|   Boston|   MA|      1000|02134|   Boston|   MA|
|Palo Alto|   CA|      2000|94020|Palo Alto|   CA|
| Santa Fe|   NM|      3000|87501| Santa Fe|   NM|
|Palo Alto|   CA|      4000|94020|Palo Alto|   CA|
+---------+-----+----------+-----+---------+-----+

