# No SQL and Spark query language
# Workshop 4, 31 May 2023


This section covers the set up on Google CoLab:

**1.   Start session**

*   Install PySpark
*   Import packages
*   Instantiate session


**2.   Load inputs**

*   Mount Google Drive
*   Load data
*   Load dataframes

**3.   Structure data**

*   Set schema
*   Print schema
*   Load dataframe with new schema










In [None]:
# (1) Start session

# Install PySpark 
!pip install pyspark
!pip install --ignore-install -q findspark

# Import packages
from pyspark import SparkConf,SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import collections

# Instantiate session
spark = SparkSession.builder.master("local").appName("No SQL and Spark query langugage").config('spark.ui.port', '4050').getOrCreate()

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m1.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.4.0-py2.py3-none-any.whl size=311317130 sha256=9e91a47ad413b7e84460739698c65dd2c206b9673090c1927b55596d4b567ef0
  Stored in directory: /root/.cache/pip/wheels/7b/1b/4b/3363a1d04368e7ff0d408e57ff57966fcdf00583774e761327
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.4.0


In [None]:
# (2) Load inputs

# Mount Google drive
from google.colab import drive
drive.mount('/content/drive', force_remount=True)

# Load data (inputFilePath is used from here till Example 16)
inputFilePath = '/content/drive/MyDrive/BEAD_DATA-master/Customer.csv'
inputFilePath1 = '/content/drive/MyDrive/BEAD_DATA-master/Producers.json'
customerFilePath = '/content/drive/MyDrive/BEAD_DATA-master/Customer.csv'
countryFilePath = '/content/drive/MyDrive/BEAD_DATA-master/Country.csv'

# Load dataframes
df = ( spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(inputFilePath) )

df1 = ( spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(inputFilePath1) )

dfCustomer = ( spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(customerFilePath) )

dfCountry = ( spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(countryFilePath) )

# Show Customer.csv and Producers.json
df.show()
df1.show()

Mounted at /content/drive
+----------+--------------------+--------------+---+------+-----------+--------------------+---------+-----------+------------+-----------+
|CustomerID|        CustomerName|MemberCategory|Age|Gender|AmountSpent|             Address|     City|CountryCode|ContactTitle|PhoneNumber|
+----------+--------------------+--------------+---+------+-----------+--------------------+---------+-----------+------------+-----------+
|      1000|        Lou Anna Tan|             A| 29|     F|       4.14|Blk 26, Telok Bla...|Frankfurt|        GER|          Ms|    2732287|
|      1001|      Wong Sook Huey|             A| 37|     F|       67.1|Blk 1007 Teresa V...|Singapore|        SIN|          Ms|    2740975|
|      1002|       Ng Choon Seng|             C| 23|     M|      63.18|Blk 63 Bishan St ...|  Toronto|        CAN|          Mr|    2580742|
|      1003|      Chew Teck Kuan|             C| 63|     M|      64.49|Blk 109 Bedok Nor...|Singapore|        SIN|          Mr|    443

In [None]:
# (3) Structure data

# Set schema
custschema = StructType([
    StructField("Customerid", IntegerType(), True),
    StructField("CustName", StringType(), True),
    StructField("MemCat", StringType(), True),
    StructField("Age", IntegerType(), True),
    StructField("Gender", StringType(), True),
    StructField("AmtSpent", DoubleType(), True),
    StructField("Address", StringType(), True),
    StructField("City", StringType(), True),
    StructField("CountryID", StringType(), True),
    StructField("Title", StringType(), True),
    StructField("PhoneNo", StringType(), True)
    ])

# Print schema
df.printSchema()

# Load dataframe with new schema

df = ( spark.read
      .schema(schema=custschema)
      .csv(inputFilePath) )

df.head()

root
 |-- CustomerID: integer (nullable = true)
 |-- CustomerName: string (nullable = true)
 |-- MemberCategory: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Gender: string (nullable = true)
 |-- AmountSpent: double (nullable = true)
 |-- Address: string (nullable = true)
 |-- City: string (nullable = true)
 |-- CountryCode: string (nullable = true)
 |-- ContactTitle: string (nullable = true)
 |-- PhoneNumber: integer (nullable = true)



Row(Customerid=None, CustName='CustomerName', MemCat='MemberCategory', Age=None, Gender='Gender', AmtSpent=None, Address='Address', City='City', CountryID='CountryCode', Title='ContactTitle', PhoneNo='PhoneNumber')

This section covers Spark Queries:

4.   Data operations

*  Return selected fields (Example 2)
*  Order by (Example 3)
*  Select retrieval (Example 4)
*  Combine functions (Example 5)
*  Aggregation and simple statistical queries (Examples 6 - 9)
*  Complex and further statistical queries (Examples 10 - 16)
*  Multiple entities (Example 17)
*  Multiple sources (Examples 18 - 19)
*  Write (Example 20)

In [None]:
# Return selected fields (Example 2)

# Return CustomerID, CustName, Age for first 200 rows
df.select("CustomerID", "CustName", "Age").show(200, False)

+----------+----------------------+----+
|CustomerID|CustName              |Age |
+----------+----------------------+----+
|null      |CustomerName          |null|
|1000      |Lou Anna Tan          |29  |
|1001      |Wong Sook Huey        |37  |
|1002      |Ng Choon Seng         |23  |
|1003      |Chew Teck Kuan        |63  |
|1111      |Steven Ou             |61  |
|1634      |Sridharan Jayanthi    |55  |
|1681      |Terence Lim           |30  |
|1810      |Vanessa Ong           |32  |
|1811      |Koh Ting Ting         |57  |
|1818      |Chionh Choon Lee      |57  |
|2131      |Jon                   |64  |
|2233      |Too Siew Hong         |35  |
|2270      |Chao Tah Jin Alex     |22  |
|2323      |Richard Kwan          |26  |
|2345      |Ng Teck Kie Anthony   |56  |
|2626      |Steven Teo            |56  |
|2669      |Boh Lee Ming Lynn     |23  |
|2688      |Kathleen Loh Swat Hong|38  |
|2741      |Goh Chee Eng          |45  |
|2820      |Ng Wee Hock John      |56  |
|2828      |Cher

In [None]:
# Order by (Example 3)

# Order by CustName
df.orderBy("CustName").show()

# Order by CustName and in descending order
df.orderBy(desc("CustName"))

# Order by MemCat then CustName
df.orderBy("MemCat","CustName").show()

+----------+--------------------+--------------+----+------+--------+--------------------+---------+-----------+------------+-----------+
|Customerid|            CustName|        MemCat| Age|Gender|AmtSpent|             Address|     City|  CountryID|       Title|    PhoneNo|
+----------+--------------------+--------------+----+------+--------+--------------------+---------+-----------+------------+-----------+
|      4567|         Abdul Zaidi|             A|  29|     M|   31.65|Blk 24, Telok Bla...|Singapore|        SIN|          Mr|    2709466|
|      5489|        Ang Kim Beng|             A|  45|     M|   32.09|Blk 208 Bedok Nor...|Singapore|        SIN|          Mr|    4486578|
|      2669|   Boh Lee Ming Lynn|             C|  23|     F|   41.99|Blk 671, Woodland...|Singapore|        SIN|          Ms|    2234567|
|      8080|      Chan Chin Fung|             B|  56|     M|   24.95|6 Dover Rise, #17...|Singapore|        SIN|          Mr|    8738529|
|      2270|   Chao Tah Jin Alex| 

In [None]:
# Select retrieval (Example 4)

# Retrieve Customers where MemCat = A
df.filter("MemCat = 'A'").show()

# Retrieve Customers where MemCat = A and name starts with T
df.filter ("MemCat = 'A' AND CustName LIKE 'T%'").show()

+----------+--------------------+------+---+------+--------+--------------------+---------+---------+-----+-------+
|Customerid|            CustName|MemCat|Age|Gender|AmtSpent|             Address|     City|CountryID|Title|PhoneNo|
+----------+--------------------+------+---+------+--------+--------------------+---------+---------+-----+-------+
|      1000|        Lou Anna Tan|     A| 29|     F|    4.14|Blk 26, Telok Bla...|Frankfurt|      GER|   Ms|2732287|
|      1001|      Wong Sook Huey|     A| 37|     F|    67.1|Blk 1007 Teresa V...|Singapore|      SIN|   Ms|2740975|
|      1634|  Sridharan Jayanthi|     A| 55|     F|   61.51|Blk 232, Jurong E...|Singapore|      SIN|   Ms|6658037|
|      1818|    Chionh Choon Lee|     A| 57|     M|    7.13|Blk 89, Zion Road...|Singapore|      SIN|   Mr|7333100|
|      2131|                 Jon|     A| 64|     F|   83.45|Block 88 Demsey R...|   Zurich|      SWZ|   Ms|7654321|
|      2323|        Richard Kwan|     A| 26|     M|   89.52|Blk 27, Mari

In [None]:
# Combine functions (Example 5)

# Retrieve Customers where MemCat = A and in ascending order
df.filter("MemCat = 'A'").orderBy("CustName").show(300,False)

+----------+----------------------+------+---+------+--------+-------------------------------------------------------------+---------+---------+-----+-------+
|Customerid|CustName              |MemCat|Age|Gender|AmtSpent|Address                                                      |City     |CountryID|Title|PhoneNo|
+----------+----------------------+------+---+------+--------+-------------------------------------------------------------+---------+---------+-----+-------+
|4567      |Abdul Zaidi           |A     |29 |M     |31.65   |Blk 24, Telok Blangah Crescent, #14-14, Singapore 0409       |Singapore|SIN      |Mr   |2709466|
|5489      |Ang Kim Beng          |A     |45 |M     |32.09   |Blk 208 Bedok North St 3 #12-234 Singapore 1644              |Singapore|SIN      |Mr   |4486578|
|2983      |Cheryl Tan            |A     |37 |F     |13.99   |Blk 84, Telok Blangah Heights, #06-323, Singapore 1004       |Singapore|SIN      |Ms   |2789967|
|1818      |Chionh Choon Lee      |A     |57 |

In [None]:
# Aggregation and simple statistical queries (Examples 6 - 9)

# Count Customers where MemCat = A
count = df.filter("MemCat = 'A'").count()
print(count)

# Sum AmtSpent for all Customers
tot = df.agg(sum("AmtSpent")).first()[0]
print(tot)

# Sum AmtSpent for Custoemrs where MemCat = A
tot = df.filter("MemCat ='A'").agg(sum("AmtSpent")).first()[0]
print(tot)

# Mean Age of all Customers
tot = df.agg(avg("Age")).first()[0]
print(tot)

26
2486.309999999999
1194.66
41.82


In [None]:
# Complex and further statistical queries (Examples 10 - 16)

# GroupBy - Sum AmtSpent for Customers by MemCat
df.groupBy("MemCat").sum("AmtSpent").show(200,False)

# GroupBy - Sum AmtSpent for Customers by MemCat and Gender
( df.groupBy("MemCat","Gender") 
  .sum("AmtSpent") 
  .orderBy("MemCat","Gender") 
  .show(200,False) )

# Roll up - Sum AmtSpent for Customers by MemCat and Gender
( df.rollup("MemCat","Gender") 
  .sum("AmtSpent") 
  .orderBy("MemCat","Gender") 
  .show(200,False) )

# Cube - Sum AmtSpent for Customers by MemCat and Gender
( df.cube("MemCat","Gender") 
  .sum("AmtSpent") 
  .orderBy("MemCat","Gender") 
  .show(200,False) )

# Standard deviation of AmtSpent
std = df.agg(stddev_pop("AmtSpent")).first()[0]
print(std)

# Skewness
skw = df.agg(skewness("AmtSpent")).first()[0]
print(skw)

# Describe commmon statistics of MemCat, Gender, AmtSpent, and Age
df.describe("MemCat", "Gender", "AmtSpent", "Age").show()

+--------------+------------------+
|MemCat        |sum(AmtSpent)     |
+--------------+------------------+
|B             |500.40999999999997|
|MemberCategory|null              |
|C             |791.24            |
|A             |1194.66           |
+--------------+------------------+

+--------------+------+-----------------+
|MemCat        |Gender|sum(AmtSpent)    |
+--------------+------+-----------------+
|A             |F     |472.2199999999999|
|A             |M     |722.4399999999998|
|B             |F     |216.48           |
|B             |M     |283.93           |
|C             |F     |401.51           |
|C             |M     |389.73           |
|MemberCategory|Gender|null             |
+--------------+------+-----------------+

+--------------+------+------------------+
|MemCat        |Gender|sum(AmtSpent)     |
+--------------+------+------------------+
|null          |null  |2486.309999999999 |
|A             |null  |1194.66           |
|A             |F     |472.219999

In [None]:
# Multiple entities (Example 17)

# Join dfCustomer and dfCountry dataframes
joinDF = dfCustomer.join(dfCountry, "CountryCode")
( joinDF.select("CustomerID", "CustomerName", "CountryCode",
                "CountryName", "Currency", "TimeZone")
  .show(300, False) ) 

+----------+----------------------+-----------+-----------+--------+--------+
|CustomerID|CustomerName          |CountryCode|CountryName|Currency|TimeZone|
+----------+----------------------+-----------+-----------+--------+--------+
|1000      |Lou Anna Tan          |GER        |Germany    |EUR     |2       |
|1001      |Wong Sook Huey        |SIN        |Singapore  |SGD     |8       |
|1002      |Ng Choon Seng         |CAN        |Canada     |CND     |-6      |
|1003      |Chew Teck Kuan        |SIN        |Singapore  |SGD     |8       |
|1111      |Steven Ou             |BRA        |Brazil     |BRL     |-3      |
|1634      |Sridharan Jayanthi    |SIN        |Singapore  |SGD     |8       |
|1681      |Terence Lim           |SLR        |Srilanka   |Srp     |0       |
|1810      |Vanessa Ong           |SIN        |Singapore  |SGD     |8       |
|1811      |Koh Ting Ting         |SIN        |Singapore  |SGD     |8       |
|1818      |Chionh Choon Lee      |SIN        |Singapore  |SGD  

In [None]:
# Multiple sources (Examples 18 - 19)

# Json
inputFilePath1 = '/content/drive/MyDrive/BEAD_DATA-master/Producers.json'

# MySQL <need to amend for Google CoLab>
df = ( spark.read.format("jdbc")
. options(url="jdbc:mysql://localhost/videoshop",
  driver="com.mysql.jdbc.Driver",
  dbtable="Movies",
  user="venkat",
  password="P@ssw0rd1").load() )

Py4JJavaError: ignored

In [None]:
# Write (Example 20)

# Write csv to json with system-generated file name
dfCountry.write.json('/content/drive/MyDrive/BEAD_DATA-master/CountryOUT')

AnalysisException: ignored

This section covers practice exercises:


5.  Data retrieval using Spark SQL

*   Load Movies.csv
*   Retrieve Movies in ascending order of MovieTitle
*   Retrieve only VideoCode, MovieTitle, and MovieType where Rating = R
*   Retrieve Movies where Movie Type = Sci-fi and ProducerID = Warner

6.  Aggregation and statistical queries

*   Mean, variance, standard deviation, and skewness of RentalPrice
*   Groupby - Sum TotalStock by MovieType and Rating
*   Rollup and cube - Sum TotalStock by MovieType and Rating

 
7.  JSON files

*   Load Producers.JSON
*   Retrieve producers where Location = UK
*   Write Movies.csv into JSON

8.  Join multiple entities and formats in a dataframe

*   Join Producers.json and Movies.csv
*   Retrieve only ProducerCode, ProducerName, Location, MovieTitle, and MovieType where ProducerCode = Walt



In [None]:
# (5) Data retrieval using Spark SQL

# Load Movies.csv
moviesFilePath = '/content/drive/MyDrive/BEAD_DATA-master/Movies.csv'
dfMovie = ( spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(moviesFilePath) )
dfMovie.head()
dfMovie.show()

# Retrieve Movies in ascending order of MovieTitle
dfMovie.orderBy("MovieTitle")

# Retrieve only VideoCode, MovieTitle, and MovieType where Rating = R
dfMovie["VideoCode","MovieTitle","MovieType"].filter("Rating = 'R'").show()

# Retrieve Movies where MovieType = Sci-fi and ProducerID = Warner
dfMovie.filter ("ProducerID = 'Warner' AND MovieType = 'Sci-fi'").show()

+---------+--------------------+---------+------+-----------+----------+---------------+-------+----------+------------+
|VideoCode|          MovieTitle|MovieType|Rating|RentalPrice|ProducerID|       Director|  Media|TotalStock|NumberRented|
+---------+--------------------+---------+------+-----------+----------+---------------+-------+----------+------------+
|        1|Star Trek 3: Sear...|   Sci-fi|    PG|        1.5|    Warner|        L Nimoy|    DVD|         7|           1|
|        2|Star Trek 4: The ...|   Sci-fi|    PG|        1.5| Universal|    Abrahams JJ|    DVD|         0|           0|
|        3|Star Trek 5: The ...|   Sci-fi|    PG|        1.5|     Pixar|William Shatner|    DVD|         3|           1|
|        4|      Demolition Man|   Action|     R|      344.0| Universal|        Marco B|    DVD|         3|           3|
|        5|             Nemesis|   Action|     R|        1.5| Universal|   Stuart Baird|    DVD|         4|           0|
|        6|        Full Eclipse|

In [None]:
# (6) Aggregation and statistical queries

# Mean, variance, standard deviation, and skewness of RentalPrice
mean = dfMovie.agg(avg("RentalPrice")).first()[0]
print(mean)
var = dfMovie.agg(variance("RentalPrice")).first()[0]
print(var)
std = dfMovie.agg(stddev_pop("RentalPrice")).first()[0]
print(std)
skew = dfMovie.agg(skewness("RentalPrice")).first()[0]
print(skew)

# Groupby - Sum TotalStock by MovieType and Rating
( dfMovie.groupBy("MovieType","Rating") 
  .sum("TotalStock") 
  .orderBy("MovieType","Rating") 
  .show(200,False) )

# Rollup and cube - Sum TotalStock by MovieType and Rating
( dfMovie.rollup("MovieType","Rating") 
  .sum("TotalStock") 
  .orderBy("MovieType","Rating") 
  .show(200,False) )
( dfMovie.cube("MovieType","Rating") 
  .sum("TotalStock") 
  .orderBy("MovieType","Rating") 
  .show(200,False) )

4.078525641025641
745.7598910668656
27.26480579438911
12.367152971126352
+----------+------+---------------+
|MovieType |Rating|sum(TotalStock)|
+----------+------+---------------+
|Action    |PG    |71             |
|Action    |R     |189            |
|Action    |U     |17             |
|Adventure |PG    |17             |
|Adventure |R     |22             |
|Animated  |U     |12             |
|Animation |PG    |5              |
|Animation |R     |6              |
|Animation |U     |21             |
|Comedy    |PG    |114            |
|Comedy    |R     |75             |
|Comedy    |U     |6              |
|Drama     |PG    |190            |
|Drama     |R     |214            |
|Drama     |U     |1              |
|Horror    |R     |20             |
|Martial Ar|R     |0              |
|Sci-fi    |PG    |76             |
|Sci-fi    |R     |23             |
|Sci-fi    |U     |6              |
|War       |PG    |13             |
+----------+------+---------------+

+----------+------+-------

In [None]:
# (7)  JSON files

# Load Producers.JSON (completed in Step (2))
df1.show(truncate = False)

# Retrieve producers where Location = UK
dfProducers = spark.createDataFrame(
    [
        (1, "20th", "20th Century Fox Productions", "UK"), 
        (2, "Columbia", "Columbia Pictures Production", "UK"),
        (3, "George", "George Lucas Production", "USA"),
        (4, "Pixar", "Pixar Entertainment", "USA"),
        (5, "Raintree", "RainTree Pictures", "SIN"),
        (6, "Walt", "Walt Disney Studio", "USA"),
        (7, "Warner", "Warner Brothers Production", "USA"),     
    ],
    ["S/N", "ProducerCode","ProducerName","Location"]
)
dfProducers.filter("Location = 'UK'").show()

#  Write Movies.csv into JSON
moviesFilePath = '/content/drive/MyDrive/BEAD_DATA-master/Movies.csv'
dfMovies = ( spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .csv(moviesFilePath) )
dfMovies.write.json('/content/drive/MyDrive/BEAD_DATA-master/MoviesOUT')

+---------------------------+----------------------------------------------+-------------------+
|{"ProducerCode":"20th"     | "ProducerName":"20th Century Fox Productions"| "Location": "UK"} |
+---------------------------+----------------------------------------------+-------------------+
|{"ProducerCode":"Columbia" | "ProducerName":"Columbia Pictures Production"| "Location": "UK"} |
|{"ProducerCode":"George"   | "ProducerName":"George Lucas Production"     | "Location": "USA"}|
|{"ProducerCode":"Pixar"    | "ProducerName":"Pixar Entertainment"         | "Location": "USA"}|
|{"ProducerCode":"Raintree" | "ProducerName":"RainTree Pictures"           | "Location": "SIN"}|
|{"ProducerCode":"Universal"| "ProducerName":"Universal Studio Productions"| "Location": "UK"} |
|{"ProducerCode":"Walt"     | "ProducerName":"Walt Disney Studio"          | "Location": "USA"}|
|{"ProducerCode":"Warner"   | "ProducerName":"Warner Brothers Production"  | "Location": "USA"}|
+---------------------------+-

AnalysisException: ignored

In [None]:
# (8)  Join multiple entities and formats in a dataframe

# Join Producers.json and Movies.csv
dfMovies1 = dfMovies.withColumnRenamed("ProducerID", "ProducerCode")
dfMovies1.head()
joinDF1 = dfMovies1.join(dfProducers,["ProducerCode"], how = "inner").distinct()

# Retrieve only ProducerCode, ProducerName, Location, MovieTitle, and MovieType where ProducerCode = Walt
( joinDF1.select("ProducerCode", "ProducerName", "Location", "MovieTitle", "MovieType",)
  .filter("ProducerCode = 'Walt'")
  .show(300,False) ) 

+------------+------------------+--------+-------------------------------------------+----------+
|ProducerCode|ProducerName      |Location|MovieTitle                                 |MovieType |
+------------+------------------+--------+-------------------------------------------+----------+
|Walt        |Walt Disney Studio|USA     |Pelican Brief                              |Drama     |
|Walt        |Walt Disney Studio|USA     |Casino                                     |Drama     |
|Walt        |Walt Disney Studio|USA     |Thunder Heart                              |Drama     |
|Walt        |Walt Disney Studio|USA     |From Hell to Victory                       |War       |
|Walt        |Walt Disney Studio|USA     |Willow                                     |Drama     |
|Walt        |Walt Disney Studio|USA     |X-Files (Piper Maru)                       |Drama     |
|Walt        |Walt Disney Studio|USA     |Career Opportunities                       |Adventure |
|Walt        |Walt D