In [3]:
import os
# Replace "../../../../" with the actual absolute path to your home directory
home_directory = "../../../"
os.environ["SPARK_HOME"] = os.path.join(home_directory, "spark-3.3.2-bin-hadoop3")

In [4]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('sql_with_spark') \
    .getOrCreate()
spark

24/03/10 16:05:32 WARN Utils: Your hostname, Endiesworld resolves to a loopback address: 127.0.1.1; using 172.22.195.180 instead (on interface eth0)
24/03/10 16:05:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/03/10 16:05:43 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [8]:
df = spark.read \
    .option("header", "true") \
    .csv('ITEMS_ORDERED.csv', inferSchema=True)
df.show(3)

+----------+----------+----------+--------+-----+
|customerid|order_date|      item|quantity|price|
+----------+----------+----------+--------+-----+
|     10330| 30-Jun-99|Pogo stick|       1| 28.0|
|     10101| 30-Jun-99|      Raft|       1| 58.0|
|     10298|  1-Jul-99|Skateboard|       1| 33.0|
+----------+----------+----------+--------+-----+
only showing top 3 rows



In [9]:
df.printSchema()

root
 |-- customerid: integer (nullable = true)
 |-- order_date: string (nullable = true)
 |-- item: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- price: double (nullable = true)



In [10]:
df.registerTempTable('items_ordered')



In [12]:
spark.sql("""
SELECT quantity, max(price)
FROM items_ordered
GROUP BY quantity;
""").show()

+--------+----------+
|quantity|max(price)|
+--------+----------+
|       1|    1250.0|
|       3|     14.75|
|       4|     125.0|
|       2|      88.7|
+--------+----------+



In [13]:
df_2 = spark.read \
    .option("header", "true") \
    .csv('customers.csv', inferSchema=True)
df_2.show(3)

24/03/10 16:29:35 WARN CSVHeaderChecker: CSV header does not conform to the schema.
 Header: customerid, firstname, lastname, city, state, 
 Schema: customerid, firstname, lastname, city, state, _c5
Expected: _c5 but found: 
CSV file: file:///home/endie/Projects/Data_Engineering/Pyspark_work/customers.csv
+----------+---------+--------+----------+----------+----+
|customerid|firstname|lastname|      city|     state| _c5|
+----------+---------+--------+----------+----------+----+
|     10101|     John|    Gray|    Lynden|Washington|null|
|     10298|    Leroy|   Brown|   Pinetop|   Arizona|null|
|     10299|    Elroy|  Keller|Snoqualmie|Washington|null|
+----------+---------+--------+----------+----------+----+
only showing top 3 rows



In [16]:
columns = ['customerid', 'firstname', 'lastname', 'city', 'state']
df_2 = df_2.select(columns) 
df_2.show(2)

+----------+---------+--------+-------+----------+
|customerid|firstname|lastname|   city|     state|
+----------+---------+--------+-------+----------+
|     10101|     John|    Gray| Lynden|Washington|
|     10298|    Leroy|   Brown|Pinetop|   Arizona|
+----------+---------+--------+-------+----------+
only showing top 2 rows



In [17]:
df_2.registerTempTable('customers')

## Review Exercises

*How many people are in each unique state in the customers table? Select the state and display the number of people in each. Hint: count is used to count rows in a column, sum works on numeric data only.*

In [22]:
spark.sql("""
SELECT COUNT(customerid) AS state_customer, state
FROM customers
GROUP BY state
""").show()


+--------------+--------------+
|state_customer|         state|
+--------------+--------------+
|             1|        Hawaii|
|             2|        Oregon|
|             2|    Washington|
|             1|North Carolina|
|             6|       Arizona|
|             1|         Idaho|
|             1|South Carolina|
|             1|     Wisconsin|
|             2|      Colorado|
+--------------+--------------+



*From the items_ordered table, select the item, maximum price, and minimum price for each specific item in the table. Hint: The items will need to be broken up into separate groups.*

In [26]:
spark.sql("""
SELECT MAX(price) AS max_price, MIN(price) AS min_price, item
FROM items_ordered
GROUP BY item
""").show()

+---------+---------+------------+
|max_price|min_price|        item|
+---------+---------+------------+
|     33.0|     33.0|  Skateboard|
|    125.0|    125.0|   Life Vest|
|     40.0|     40.0|Canoe paddle|
|     28.0|     28.0|  Pogo stick|
|     6.75|      4.5|    Umbrella|
|    89.22|     88.7|Sleeping Bag|
|    380.5|    380.5|     Bicycle|
|      8.0|      8.0|     Compass|
|     18.3|     18.3|   Rain Coat|
|    16.75|    16.75|      Shovel|
|     22.0|     22.0|      Helmet|
|     58.0|     58.0|        Raft|
|     32.0|     32.0|   Lawnchair|
|     88.0|    79.99|        Tent|
|    280.0|    280.0|       Canoe|
|     12.5|     12.5|   Ear Muffs|
|     29.0|     16.0|     Lantern|
|    192.5|   180.79|    Unicycle|
|     28.0|      4.5|  Flashlight|
|   1250.0|   1250.0|   Parachute|
+---------+---------+------------+
only showing top 20 rows



*How many orders did each customer make? Use the items_ordered table. Select the customerid, number of orders they made, and the sum of their orders. Click the Group By answers link below if you have any problems.*

In [28]:
spark.sql("""
SELECT customerid, COUNT(order_date) AS nos_order, SUM(price) AS total_cost
FROM items_ordered
GROUP BY customerid
""").show()

+----------+---------+----------+
|customerid|nos_order|total_cost|
+----------+---------+----------+
|     10449|        6|    930.79|
|     10410|        2|    281.72|
|     10339|        1|       4.5|
|     10101|        6|    320.75|
|     10315|        1|       8.0|
|     10438|        3|     95.24|
|     10330|        3|     72.75|
|     10299|        2|    1288.0|
|     10413|        1|      32.0|
|     10298|        5|    118.88|
|     10439|        2|     113.5|
+----------+---------+----------+



In [29]:
spark.stop()