In [1]:
from pyspark.sql import SparkSession
import getpass
username = getpass.getuser()
spark = SparkSession. \
builder. \
config('spark.ui.port','0'). \
config("spark.sql.warehouse.dir", f"/user/{username}/warehouse"). \
enableHiveSupport(). \
master('yarn'). \
getOrCreate()

In [2]:
orders_schema = "order_id long , order_date string, customer_id long, order_status string"

In [3]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("/public/trendytech/orders/orders_1gb.csv")

In [4]:
orders_df.show()

+--------+--------------------+-----------+---------------+
|order_id|          order_date|customer_id|   order_status|
+--------+--------------------+-----------+---------------+
|       1|2013-07-25 00:00:...|      11599|         CLOSED|
|       2|2013-07-25 00:00:...|        256|PENDING_PAYMENT|
|       3|2013-07-25 00:00:...|      12111|       COMPLETE|
|       4|2013-07-25 00:00:...|       8827|         CLOSED|
|       5|2013-07-25 00:00:...|      11318|       COMPLETE|
|       6|2013-07-25 00:00:...|       7130|       COMPLETE|
|       7|2013-07-25 00:00:...|       4530|       COMPLETE|
|       8|2013-07-25 00:00:...|       2911|     PROCESSING|
|       9|2013-07-25 00:00:...|       5657|PENDING_PAYMENT|
|      10|2013-07-25 00:00:...|       5648|PENDING_PAYMENT|
|      11|2013-07-25 00:00:...|        918| PAYMENT_REVIEW|
|      12|2013-07-25 00:00:...|       1837|         CLOSED|
|      13|2013-07-25 00:00:...|       9149|PENDING_PAYMENT|
|      14|2013-07-25 00:00:...|       98

In [6]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("/user/itv009538/writer_demo/sparkwriterdemo1")

In [7]:
orders_df.createOrReplaceTempView("orders")

In [8]:
spark.sql("select count(*) from orders where order_status = 'CLOSED'")

count(1)
2833500


In [11]:
orders_df.write \
.format("csv") \
.mode("overwrite") \
.partitionBy("order_status") \
.option("path", "/user/itv009538/partition_demo/partition_demo_output1") \
.save()

In [12]:
orders_df.rdd.getNumPartitions()

9

In [13]:
orders_df = spark.read \
.format("csv") \
.schema(orders_schema) \
.load("/user/itv009538/partition_demo/partition_demo_output1")

In [14]:
orders_df.createOrReplaceTempView("orders")

In [15]:
spark.sql("select count(*) from orders where order_status = 'CLOSED'")

count(1)
2833500


In [22]:
customers_df = spark.read \
.format("csv") \
.option("inferSchema", True) \
.load("/public/trendytech/retail_db/customers/part-00000")

In [23]:
customers_final_df = customers_df.toDF("customer_id", "customer_fname", "customer_lname", "customer_email", "customer_password", "customer_street", "customer_city", "customer_state", "customer_zipcode")

In [24]:
customers_final_df.write \
.format("parquet")\
.mode("overwrite") \
.partitionBy("customer_state","customer_city") \
.option("path", "/user/itv009538/partition_demo/partition_demo_output2") \
.save()

In [2]:
customers_df = spark.read \
.format("parquet") \
.load("/user/itv009538/partition_demo/partition_demo_output2")

In [3]:
customers_df.show()

+-----------+--------------+--------------+--------------+-----------------+--------------------+----------------+--------------+-------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_zipcode|customer_state|customer_city|
+-----------+--------------+--------------+--------------+-----------------+--------------------+----------------+--------------+-------------+
|          3|           Ann|         Smith|     XXXXXXXXX|        XXXXXXXXX|3422 Blue Pioneer...|             725|            PR|       Caguas|
|          5|        Robert|        Hudson|     XXXXXXXXX|        XXXXXXXXX|10 Crystal River ...|             725|            PR|       Caguas|
|          7|       Melissa|        Wilcox|     XXXXXXXXX|        XXXXXXXXX|9453 High Concession|             725|            PR|       Caguas|
|          9|          Mary|         Perez|     XXXXXXXXX|        XXXXXXXXX| 3616 Quaking Street|             725|            PR|       

In [4]:
customers_df.rdd.getNumPartitions()

19

In [5]:
customers_df.createOrReplaceTempView("customers")

In [6]:
spark.sql("select * from customers where customer_state = 'PR' and customer_id = 19").show()

+-----------+--------------+--------------+--------------+-----------------+--------------------+----------------+--------------+-------------+
|customer_id|customer_fname|customer_lname|customer_email|customer_password|     customer_street|customer_zipcode|customer_state|customer_city|
+-----------+--------------+--------------+--------------+-----------------+--------------------+----------------+--------------+-------------+
|         19|     Stephanie|      Mitchell|     XXXXXXXXX|        XXXXXXXXX|3543 Red Treasure...|             725|            PR|       Caguas|
+-----------+--------------+--------------+--------------+-----------------+--------------------+----------------+--------------+-------------+



In [7]:
spark.sql("select count(*) from customers where customer_city = 'Caguas'").show()

+--------+
|count(1)|
+--------+
|    4584|
+--------+

