In [1]:
import findspark
findspark.init()
import pyspark

Import and initiate findspark to begin with.
And import pyspark

Now start SparkSession

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python Spark SQL example").getOrCreate()

Create DataFrame from data source - csv file

In [3]:
customerDF = spark.read.load("customers.txt", format="csv", sep="\t", inferSchema="true", header="true")

Different types of operations on DataFrames

In [4]:
customerDF.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zipcode: integer (nullable = true)



In [5]:
customerDF.select("customer_name").show()

+----------------+
|   customer_name|
+----------------+
|     Mary Torres|
|      Jose Haley|
|      Mary Smith|
|  Richard Maddox|
|  Margaret Booth|
|  Mary Henderson|
|     Lisa Walker|
|   Jonathan Hill|
|Carolyn Sheppard|
|    Mary Mendoza|
|   Michael Smith|
|    James Holmes|
|     Mary Dawson|
|    Adam Marquez|
|    Gloria Smith|
|       Mary Webb|
|  Nancy Alvarado|
|  Russell Flores|
|    Denise Smith|
|  Jose Dickerson|
+----------------+
only showing top 20 rows



In [6]:
customerDF.select(customerDF['customer_name'], customerDF['customer_city']).show()

+----------------+-------------+
|   customer_name|customer_city|
+----------------+-------------+
|     Mary Torres|       Caguas|
|      Jose Haley|     Columbus|
|      Mary Smith|      Houston|
|  Richard Maddox|       Caguas|
|  Margaret Booth|    Arlington|
|  Mary Henderson|       Caguas|
|     Lisa Walker|       Caguas|
|   Jonathan Hill|      Phoenix|
|Carolyn Sheppard|Pompano Beach|
|    Mary Mendoza|       Caguas|
|   Michael Smith|       Caguas|
|    James Holmes|     Hilliard|
|     Mary Dawson|       Caguas|
|    Adam Marquez|  San Antonio|
|    Gloria Smith|       Caguas|
|       Mary Webb|   San Marcos|
|  Nancy Alvarado|     Flushing|
|  Russell Flores|       Caguas|
|    Denise Smith|    Rego Park|
|  Jose Dickerson|         Mesa|
+----------------+-------------+
only showing top 20 rows



In [7]:
customerDF.filter(customerDF['customer_state'] == 'CA').show()

+-----------+----------------+---------------+--------------+----------------+
|customer_id|   customer_name|  customer_city|customer_state|customer_zipcode|
+-----------+----------------+---------------+--------------+----------------+
|       5577|      Mary Smith|        Modesto|            CA|           95350|
|       1745|      Mary Smith|Rowland Heights|            CA|           91748|
|      11444|Kathleen Patrick|      San Diego|            CA|           92109|
|       8846|    Thomas Smith|          Indio|            CA|           92201|
|       6237|  Bobby Anderson|       El Cajon|            CA|           92020|
|       4085|       Mary Carr|  Panorama City|            CA|           91402|
|       8705|  Patricia Smith|       Stockton|            CA|           95207|
|       3669|       Mary Soto| San Bernardino|            CA|           92410|
|       6101|      Mary Smith|    Los Angeles|            CA|           90033|
|      11697|  Jessica Thomas|  Laguna Niguel|      

In [8]:
customerDF.groupBy("customer_state").count().show()

+--------------+-----+
|customer_state|count|
+--------------+-----+
|            AZ|   19|
|            SC|    2|
|            LA|    7|
|            MN|    1|
|            NJ|   19|
|            DC|    4|
|            OR|    4|
|            VA|   14|
|            RI|    2|
|            KY|    1|
|            MI|   28|
|            NV|   16|
|            WI|    9|
|            ID|    2|
|            CA|  187|
|            CT|    8|
|            NC|   19|
|            MD|   19|
|            DE|    1|
|            MO|   13|
+--------------+-----+
only showing top 20 rows



Create temp view for running SQL queries on the dataframe

In [9]:
customerDF.createOrReplaceTempView("customers")

SQL can be run on DataFrames that are registered as temp views.

In [10]:
cStateCount50 = spark.sql("SELECT customer_state, count(*) as state_count FROM customers GROUP BY customer_state HAVING state_count>=50")

In [11]:
type(cStateCount50)

pyspark.sql.dataframe.DataFrame

In [12]:
cStateCount50.show()

+--------------+-----------+
|customer_state|state_count|
+--------------+-----------+
|            CA|        187|
|            NY|         79|
|            TX|         62|
|            PR|        505|
+--------------+-----------+



In [13]:
cStateCount50.printSchema()

root
 |-- customer_state: string (nullable = true)
 |-- state_count: long (nullable = false)



In [14]:
cStateCount50.coalesce(1).write.parquet("cStateOutput1.parquet")

Since the dataframe is small we can coalesce all the partitions into one and write it. This will result in a single file output else output will be in as many files as the number of partitions. coalesce can prove expensive on large dataframes.

Alternatively the following syntax is also allowed.
We can use the above since parquet is the default format used by SparkSQL.

cStateCount50.coalesce(1).write.save("cStateOutput2.parquet", format="parquet")

cStateCount50.coalesce(1).write.save("cStateOutput3.parquet")

In [15]:
cStateCount50.coalesce(1).write.json("cStateOutput1.json")

Alternatively the following syntax is also allowed.

cStateCount50.coalesce(1).write.save("cStateOutput2.json", format="json")

Now Create DataFrame from data source - JSON file

In [16]:
productDF = spark.read.load("products.json", format="json")

Alternatively the following syntax is also allowed

productDF = spark.read.json("products.json")

We can now run set of the DataFrame operations

In [17]:
productDF.printSchema()

root
 |-- category_id: long (nullable = true)
 |-- customer_id: long (nullable = true)
 |-- product_category: string (nullable = true)
 |-- product_name: string (nullable = true)
 |-- product_price: double (nullable = true)
 |-- product_quantity: long (nullable = true)
 |-- salestxn_id: long (nullable = true)



In [18]:
productDF.select("product_name").show()

+--------------------+
|        product_name|
+--------------------+
|O'Brien Men's Neo...|
|O'Brien Men's Neo...|
|Under Armour Wome...|
|O'Brien Men's Neo...|
|Pelican Sunstream...|
|Nike Men's CJ Eli...|
|Diamondback Women...|
|Field & Stream Sp...|
|Perfect Fitness P...|
|Nike Men's CJ Eli...|
|Pelican Sunstream...|
|Nike Men's CJ Eli...|
|Diamondback Women...|
|Nike Men's CJ Eli...|
|Nike Men's Dri-FI...|
|O'Brien Men's Neo...|
|O'Brien Men's Neo...|
|Nike Men's Dri-FI...|
|Diamondback Women...|
|Under Armour Girl...|
+--------------------+
only showing top 20 rows



In [19]:
productDF.select(productDF['product_name'], productDF['product_category'], productDF['product_price']).show()

+--------------------+------------------+-------------+
|        product_name|  product_category|product_price|
+--------------------+------------------+-------------+
|O'Brien Men's Neo...|           Fishing|        49.98|
|O'Brien Men's Neo...|           Fishing|        49.98|
|Under Armour Wome...|      Boxing & MMA|        31.99|
|O'Brien Men's Neo...|           Fishing|        49.98|
|Pelican Sunstream...|           Boating|       199.99|
|Nike Men's CJ Eli...|            Cleats|       129.99|
|Diamondback Women...| Bike & Skate Shop|       299.98|
|Field & Stream Sp...|Hunting & Shooting|       399.98|
|Perfect Fitness P...|   As Seen on  TV!|        59.99|
|Nike Men's CJ Eli...|            Cleats|       129.99|
|Pelican Sunstream...|           Boating|       199.99|
|Nike Men's CJ Eli...|            Cleats|       129.99|
|Diamondback Women...| Bike & Skate Shop|       299.98|
|Nike Men's CJ Eli...|            Cleats|       129.99|
|Nike Men's Dri-FI...|     Men's Apparel|       

In [20]:
productDF.filter(productDF['product_price'] > 200.00).show()

+-----------+-----------+------------------+--------------------+-------------+----------------+-----------+
|category_id|customer_id|  product_category|        product_name|product_price|product_quantity|salestxn_id|
+-----------+-----------+------------------+--------------------+-------------+----------------+-----------+
|         42|        702| Bike & Skate Shop|Diamondback Women...|       299.98|               1|     140220|
|         44|       3959|Hunting & Shooting|Field & Stream Sp...|       399.98|               1|      77426|
|         42|       5658| Bike & Skate Shop|Diamondback Women...|       299.98|               1|      84894|
|         42|       9356| Bike & Skate Shop|Diamondback Women...|       299.98|               1|     102807|
|         42|       8651| Bike & Skate Shop|Diamondback Women...|       299.98|               1|     134324|
|         44|      12072|Hunting & Shooting|Field & Stream Sp...|       399.98|               1|      27476|
|         42|      

In [21]:
productDF.groupBy("product_category").count().show()

+-------------------+-----+
|   product_category|count|
+-------------------+-----+
|  Training by Sport|    5|
|   Men's Golf Clubs|   21|
|   Camping & Hiking|   44|
|Fitness Accessories|   47|
|         Golf Shoes|    6|
|         Basketball|   36|
|        Electronics|   48|
|          Team Shop|  162|
|      Men's Apparel| 2085|
|  Bike & Skate Shop| 1377|
|  Golf Bags & Carts|   89|
|    As Seen on  TV!| 2399|
|       Boxing & MMA|  115|
| Hunting & Shooting| 1785|
|Baseball & Softball|    4|
|       Golf Apparel|   51|
| Women's Golf Clubs|   57|
|      Shop By Sport|   26|
|            Fishing| 1953|
|        Accessories|  110|
+-------------------+-----+
only showing top 20 rows



Create temp view for running SQL queries on the dataframe

In [22]:
productDF.createOrReplaceTempView("products")

SQL queries can be run on the DataFrames that have been registered as a table.

In [23]:
prd200 = spark.sql("SELECT category_id, product_category, count(*) as prdcount FROM products WHERE product_price>200 GROUP BY category_id, product_category ORDER BY product_category")

In [24]:
type(prd200)

pyspark.sql.dataframe.DataFrame

In [25]:
prd200.show()

+-----------+-------------------+--------+
|category_id|   product_category|prdcount|
+-----------+-------------------+--------+
|         40|        Accessories|       7|
|         16|    As Seen on  TV!|       6|
|          3|Baseball & Softball|       4|
|         42|  Bike & Skate Shop|    1377|
|         47|            Boating|       6|
|          9|   Cardio Equipment|       3|
|         37|        Electronics|       9|
|         34|  Golf Bags & Carts|      10|
|         44| Hunting & Shooting|    1785|
|         30|   Men's Golf Clubs|       5|
|         10|  Strength Training|       2|
|          6|   Tennis & Racquet|       3|
+-----------+-------------------+--------+



In [26]:
prd200.printSchema()

root
 |-- category_id: long (nullable = true)
 |-- product_category: string (nullable = true)
 |-- prdcount: long (nullable = false)



In [27]:
prd200.coalesce(1).write.save("product3.parquet")

Alternatively the following syntax is also allowed

prd200.write.save("product2.parquet", format="parquet")

prd200.write.parquet("product1.parquet")

Since parquet is the default format used by SparkSQL we need not specify it as shown in the first command.

In [28]:
prd200.coalesce(1).write.json("product1.json")

Alternatively the following syntax is also allowed

prd200.write.save("product2.json", format="json")

Now that we have two datasets in two views we can join them on the common column for queries. For example:
    
Get the list of customers and product categories in which they bought multiple items (quantity) that are more expensive than 200.00

In [29]:
custlist200 = spark.sql("SELECT a.customer_name, b.product_category, count(*) as prdcount FROM customers a INNER JOIN products b ON a.customer_id=b.customer_id WHERE b.product_price>200.00 GROUP BY a.customer_name, b.product_category HAVING prdcount>1")

In [30]:
type(custlist200)

pyspark.sql.dataframe.DataFrame

In [31]:
custlist200.show()

+-----------------+------------------+--------+
|    customer_name|  product_category|prdcount|
+-----------------+------------------+--------+
|      David Smith|Hunting & Shooting|       2|
|    William Weiss|Hunting & Shooting|       2|
|    William Smith| Bike & Skate Shop|       2|
|    William Smith|Hunting & Shooting|       3|
|       Mary Smith| Bike & Skate Shop|      22|
|   Kimberly Blair|Hunting & Shooting|       2|
|     William Clay|Hunting & Shooting|       2|
|   Margaret Smith|Hunting & Shooting|       2|
|    Russell Smith|Hunting & Shooting|       2|
|       Mary Lopez| Bike & Skate Shop|       2|
|      Louis Novak|Hunting & Shooting|       2|
|      Linda Smith| Bike & Skate Shop|       2|
|       Mary Black| Bike & Skate Shop|       2|
|       Linda Hale| Bike & Skate Shop|       2|
|    Mary Gonzales|Hunting & Shooting|       2|
|   Jesse Matthews|Hunting & Shooting|       2|
|      Mary Daniel| Bike & Skate Shop|       2|
|     Albert Smith|Hunting & Shooting|  

In [32]:
custlist200.printSchema()

root
 |-- customer_name: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- prdcount: long (nullable = false)



Syntax for reading a parquet file and loading as a DF

productDF1 = spark.read.load("products.parquet")

We can use the above since parquet is the default format used by SparkSQL

productDF2 = spark.read.load("products.parquet", format="parquet")

productDF3 = spark.read.parquet("products.parquet")