# Create spark session

In [2]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

my_conf = SparkConf()
my_conf.set("spark.app.name", "My Application")
my_conf.set("spark.ui.port", "4050")

spark = SparkSession \
            .builder \
            .config(conf=my_conf) \
            .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [3]:
spark.sparkContext.setLogLevel("ERROR")

In [4]:
# Read the data from the CSV 
DATASET_PATH = "s3://data-engg-suman/dataset/orders.csv"

In [5]:
# First load the data in the DF 

from pyspark.sql.types import StructField, StructType, IntegerType, StringType, FloatType, DateType

orders_data_schema = StructType(
    [
        StructField('order_id', IntegerType()),
        StructField('order_date', DateType()),
        StructField('order_customer_id', IntegerType()),
        StructField('order_status', StringType()), 
    ]
)

orders_data_df = spark \
                    .read \
                    .format('csv') \
                    .option('header', True) \
                    .schema(orders_data_schema) \
                    .option('path', DATASET_PATH) \
                    .load()

In [6]:
# Use the writer API to SINK the data 
import os 

OUTPUT_PATH = "s3://data-engg-suman/processed_data2"
OUTPUT_FILE_PATH = os.path.join(OUTPUT_PATH, 'orders_data')

orders_data_df \
    .write \
    .format('parquet') \
    .mode('overwrite') \
    .option('path', OUTPUT_FILE_PATH) \
    .save() 

                                                                                

In [7]:
# Verify the data written 

orders_data_df_parquet = spark \
                    .read \
                    .format('parquet') \
                    .option('path', OUTPUT_FILE_PATH) \
                    .load()

In [8]:
orders_data_df_parquet.show()

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|
+--------+----------+-----------------+---------------+
|       1|2013-07-25|            11599|         CLOSED|
|       2|2013-07-25|              256|PENDING_PAYMENT|
|       3|2013-07-25|            12111|       COMPLETE|
|       4|2013-07-25|             8827|         CLOSED|
|       5|2013-07-25|            11318|       COMPLETE|
|       6|2013-07-25|             7130|       COMPLETE|
|       7|2013-07-25|             4530|       COMPLETE|
|       8|2013-07-25|             2911|     PROCESSING|
|       9|2013-07-25|             5657|PENDING_PAYMENT|
|      10|2013-07-25|             5648|PENDING_PAYMENT|
|      11|2013-07-25|              918| PAYMENT_REVIEW|
|      12|2013-07-25|             1837|         CLOSED|
|      13|2013-07-25|             9149|PENDING_PAYMENT|
|      14|2013-07-25|             9842|     PROCESSING|
|      15|2013-07-25|             2568|       CO

In [9]:
# Number of partitions 
orders_data_df_parquet.rdd.getNumPartitions()

1

In [10]:
repart_orders_data_df = orders_data_df.repartition(4)

In [11]:
repart_orders_data_df \
    .write \
    .format('parquet') \
    .mode('overwrite') \
    .partitionBy('order_status') \
    .option('path', OUTPUT_FILE_PATH) \
    .save() 

                                                                                

In [12]:
repart_orders_data_df.rdd.getNumPartitions()

4

In [17]:
repart_orders_data_df \
    .write \
    .format('parquet') \
    .mode('overwrite') \
    .option('maxrecordsperfile', 20000) \
    .option('path', OUTPUT_FILE_PATH) \
    .save() 

                                                                                

# SparkSQL

In [18]:
orders_data_df.createOrReplaceTempView('orders')

In [21]:
spark.sql("SELECT * FROM orders").show() # This table is distributed across machines 

+--------+----------+-----------------+---------------+
|order_id|order_date|order_customer_id|   order_status|
+--------+----------+-----------------+---------------+
|       1|2013-07-25|            11599|         CLOSED|
|       2|2013-07-25|              256|PENDING_PAYMENT|
|       3|2013-07-25|            12111|       COMPLETE|
|       4|2013-07-25|             8827|         CLOSED|
|       5|2013-07-25|            11318|       COMPLETE|
|       6|2013-07-25|             7130|       COMPLETE|
|       7|2013-07-25|             4530|       COMPLETE|
|       8|2013-07-25|             2911|     PROCESSING|
|       9|2013-07-25|             5657|PENDING_PAYMENT|
|      10|2013-07-25|             5648|PENDING_PAYMENT|
|      11|2013-07-25|              918| PAYMENT_REVIEW|
|      12|2013-07-25|             1837|         CLOSED|
|      13|2013-07-25|             9149|PENDING_PAYMENT|
|      14|2013-07-25|             9842|     PROCESSING|
|      15|2013-07-25|             2568|       CO

In [23]:
spark.sql("SELECT order_status, count(*) as count \
            FROM orders \
            GROUP BY order_status \
            ORDER BY count DESC").show() # This table is distributed across machines 

+---------------+-----+
|   order_status|count|
+---------------+-----+
|       COMPLETE|22899|
|PENDING_PAYMENT|15030|
|     PROCESSING| 8274|
|        PENDING| 7609|
|         CLOSED| 7556|
|        ON_HOLD| 3798|
|SUSPECTED_FRAUD| 1558|
|       CANCELED| 1428|
| PAYMENT_REVIEW|  729|
+---------------+-----+



                                                                                

In [29]:
# Find out how many orders each customer has placed which are in CLOSED state

spark.sql("SELECT order_status, count(*) as count \
            FROM orders \
            GROUP BY order_status HAVING order_status = 'CLOSED'\
            ORDER BY count DESC").show() # This table is distributed across machines 

+------------+-----+
|order_status|count|
+------------+-----+
|      CLOSED| 7556|
+------------+-----+



                                                                                

In [31]:
spark.sql("SELECT order_customer_id, count(*) as count \
            FROM orders \
            WHERE order_status = 'CLOSED'\
            GROUP BY order_customer_id \
            ORDER BY count DESC").show()

[Stage 33:>                                                         (0 + 1) / 1]

+-----------------+-----+
|order_customer_id|count|
+-----------------+-----+
|             1833|    6|
|             1687|    5|
|             5493|    5|
|             1363|    5|
|             8974|    4|
|             2774|    4|
|             2236|    4|
|             4282|    4|
|             5582|    4|
|            12431|    4|
|             9740|    4|
|             7879|    4|
|             4573|    4|
|             9213|    4|
|             4588|    4|
|            10111|    4|
|             2768|    4|
|             7948|    4|
|             9804|    4|
|             1521|    4|
+-----------------+-----+
only showing top 20 rows



                                                                                

In [32]:
orders_data_df.createOrReplaceTempView('orders')

spark.sql("SELECT order_status, count(*) as count \
            FROM orders \
            GROUP BY order_status \
            ORDER BY count DESC").show() # This table is distributed across machines 

[Stage 36:>                                                         (0 + 1) / 1]

+---------------+-----+
|   order_status|count|
+---------------+-----+
|       COMPLETE|22899|
|PENDING_PAYMENT|15030|
|     PROCESSING| 8274|
|        PENDING| 7609|
|         CLOSED| 7556|
|        ON_HOLD| 3798|
|SUSPECTED_FRAUD| 1558|
|       CANCELED| 1428|
| PAYMENT_REVIEW|  729|
+---------------+-----+



                                                                                

# Save the data in the form of table

In [33]:
spark.sql("CREATE DATABASE IF NOT EXISTS retail")

DataFrame[]

In [35]:
orders_data_df.write \
            .format('csv') \
            .mode('overwrite') \
            .saveAsTable('retail.orders') \


                                                                                

In [40]:
spark.catalog.listTables('retail')

[Table(name='orders', database='retail', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='orders', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

## Using `bucketBy`

In [44]:
orders_data_df.write \
            .format('csv') \
            .mode('overwrite') \
            .bucketBy(3, 'order_customer_id') \
            .sortBy('order_customer_id') \
            .saveAsTable('retail.orders') 

                                                                                

# Transformations

In [53]:
# Read the data from the CSV 
DATASET_PATH = "s3://data-engg-suman/dataset/orders_new.csv"

In [71]:
rdd = spark.sparkContext.textFile(DATASET_PATH)

In [72]:
rdd.collect()

['1 2013-07-25\t11599,CLOSED',
 '2 2013-07-25\t256,PENDING_PAYMENT',
 '3 2013-07-25\t12111,COMPLETE',
 '4 2013-07-25\t8827,CLOSED',
 '5 2013-07-25\t11318,COMPLETE',
 '6 2013-07-25\t7130,COMPLETE',
 '7 2013-07-25\t4530,COMPLETE',
 '8 2013-07-25\t2911,PROCESSING',
 '9 2013-07-25\t5657,PENDING_PAYMENT',
 '10 2013-07-25\t5648,PENDING_PAYMENT',
 '11 2013-07-25\t918,PAYMENT_REVIEW',
 '12 2013-07-25\t1837,CLOSED']

In [82]:
my_regx = r'^(\S+) (\S+)\t(\S+)\,(\S+)'

In [83]:
df = spark.read.text(DATASET_PATH)

In [84]:
df.show(truncate=False)

+-----------------------------------+
|value                              |
+-----------------------------------+
|1 2013-07-25\t11599,CLOSED         |
|2 2013-07-25\t256,PENDING_PAYMENT  |
|3 2013-07-25\t12111,COMPLETE       |
|4 2013-07-25\t8827,CLOSED          |
|5 2013-07-25\t11318,COMPLETE       |
|6 2013-07-25\t7130,COMPLETE        |
|7 2013-07-25\t4530,COMPLETE        |
|8 2013-07-25\t2911,PROCESSING      |
|9 2013-07-25\t5657,PENDING_PAYMENT |
|10 2013-07-25\t5648,PENDING_PAYMENT|
|11 2013-07-25\t918,PAYMENT_REVIEW  |
|12 2013-07-25\t1837,CLOSED         |
+-----------------------------------+



In [85]:
from pyspark.sql import functions as F 
final_df = df.select(F.regexp_extract('value', my_regx, 1).alias("order_id"), F.regexp_extract('value', my_regx, 2).alias("date"), F.regexp_extract('value', my_regx, 3).alias("customer_id"), F.regexp_extract('value', my_regx, 4).alias("status")  )

In [87]:
final_df.show()

+--------+----------+-----------+---------------+
|order_id|      date|customer_id|         status|
+--------+----------+-----------+---------------+
|       1|2013-07-25|      11599|         CLOSED|
|       2|2013-07-25|        256|PENDING_PAYMENT|
|       3|2013-07-25|      12111|       COMPLETE|
|       4|2013-07-25|       8827|         CLOSED|
|       5|2013-07-25|      11318|       COMPLETE|
|       6|2013-07-25|       7130|       COMPLETE|
|       7|2013-07-25|       4530|       COMPLETE|
|       8|2013-07-25|       2911|     PROCESSING|
|       9|2013-07-25|       5657|PENDING_PAYMENT|
|      10|2013-07-25|       5648|PENDING_PAYMENT|
|      11|2013-07-25|        918| PAYMENT_REVIEW|
|      12|2013-07-25|       1837|         CLOSED|
+--------+----------+-----------+---------------+



In [86]:
final_df.select('order_id').show()

+--------+
|order_id|
+--------+
|       1|
|       2|
|       3|
|       4|
|       5|
|       6|
|       7|
|       8|
|       9|
|      10|
|      11|
|      12|
+--------+



In [88]:
final_df.printSchema()

root
 |-- order_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- status: string (nullable = true)



# Column Object, Column String and Comumn Expression 

In [93]:
DATASET_PATH = 's3://data-engg-suman/dataset/orders.csv'

In [95]:
df = spark \
                    .read \
                    .format('csv') \
                    .option('header', True) \
                    .option('inferSchema', True) \
                    .option('path', DATASET_PATH) \
                    .load()

In [97]:
df.show(2)

+--------+-------------------+-----------------+---------------+
|order_id|         order_date|order_customer_id|   order_status|
+--------+-------------------+-----------------+---------------+
|       1|2013-07-25 00:00:00|            11599|         CLOSED|
|       2|2013-07-25 00:00:00|              256|PENDING_PAYMENT|
+--------+-------------------+-----------------+---------------+
only showing top 2 rows



In [100]:
# Column String 
df.select('order_id', 'order_status').show(2)

+--------+---------------+
|order_id|   order_status|
+--------+---------------+
|       1|         CLOSED|
|       2|PENDING_PAYMENT|
+--------+---------------+
only showing top 2 rows



In [101]:
# Column Object

from pyspark.sql import functions as F

df.select( F.col('order_id'), F.column('order_status') ).show(2)

+--------+---------------+
|order_id|   order_status|
+--------+---------------+
|       1|         CLOSED|
|       2|PENDING_PAYMENT|
+--------+---------------+
only showing top 2 rows



In [102]:
# Column Object and Column String 

df.select('order_id', F.column('order_status') ).show(2)

+--------+---------------+
|order_id|   order_status|
+--------+---------------+
|       1|         CLOSED|
|       2|PENDING_PAYMENT|
+--------+---------------+
only showing top 2 rows



In [130]:
df.select(F.col('order_id'), F.concat(F.col('order_status'), F.lit('_STATUS')).alias('new_order_status')).show(truncate=False)

+--------+----------------------+
|order_id|new_order_status      |
+--------+----------------------+
|1       |CLOSED_STATUS         |
|2       |PENDING_PAYMENT_STATUS|
|3       |COMPLETE_STATUS       |
|4       |CLOSED_STATUS         |
|5       |COMPLETE_STATUS       |
|6       |COMPLETE_STATUS       |
|7       |COMPLETE_STATUS       |
|8       |PROCESSING_STATUS     |
|9       |PENDING_PAYMENT_STATUS|
|10      |PENDING_PAYMENT_STATUS|
|11      |PAYMENT_REVIEW_STATUS |
|12      |CLOSED_STATUS         |
|13      |PENDING_PAYMENT_STATUS|
|14      |PROCESSING_STATUS     |
|15      |COMPLETE_STATUS       |
|16      |PENDING_PAYMENT_STATUS|
|17      |COMPLETE_STATUS       |
|18      |CLOSED_STATUS         |
|19      |PENDING_PAYMENT_STATUS|
|20      |PROCESSING_STATUS     |
+--------+----------------------+
only showing top 20 rows



# User Defined Functions using Structured API 

In [133]:
DATASET_PATH = 's3://data-engg-suman/dataset/dataset1'

df = spark \
        .read \
        .format('csv') \
        .option('inferSchema', True) \
        .option('header', False) \
        .option('path', DATASET_PATH) \
        .load()

In [134]:
df.show()

+-------+---+---------+
|    _c0|_c1|      _c2|
+-------+---+---------+
|  sumit| 30|bangalore|
|  kapil| 32|hyderabad|
|sathish| 16|  chennai|
|   ravi| 39|bangalore|
| kavita| 12|hyderabad|
|  kavya| 19|   mysore|
+-------+---+---------+



In [135]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- _c1: integer (nullable = true)
 |-- _c2: string (nullable = true)



In [136]:
# Lets add some meaningful coln names 

df1 = df.toDF('name', 'age', 'city')

df1.show()

+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
|  sumit| 30|bangalore|
|  kapil| 32|hyderabad|
|sathish| 16|  chennai|
|   ravi| 39|bangalore|
| kavita| 12|hyderabad|
|  kavya| 19|   mysore|
+-------+---+---------+



In [137]:
df1.printSchema()

root
 |-- name: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- city: string (nullable = true)



In [138]:
df1.show()

+-------+---+---------+
|   name|age|     city|
+-------+---+---------+
|  sumit| 30|bangalore|
|  kapil| 32|hyderabad|
|sathish| 16|  chennai|
|   ravi| 39|bangalore|
| kavita| 12|hyderabad|
|  kavya| 19|   mysore|
+-------+---+---------+



In [139]:
# Age check function 
def age_check(age):
    if age > 18:
        return "Y"
    else:
        return "N"

In [146]:
# Register the function as UDF 
parse_age_function = F.udf(age_check, StringType())

In [151]:
# We need to create a new column 
df2 = df1.withColumn('Adult', parse_age_function(df1.age))
df2.show()

+-------+---+---------+-----+
|   name|age|     city|Adult|
+-------+---+---------+-----+
|  sumit| 30|bangalore|    Y|
|  kapil| 32|hyderabad|    Y|
|sathish| 16|  chennai|    N|
|   ravi| 39|bangalore|    Y|
| kavita| 12|hyderabad|    N|
|  kavya| 19|   mysore|    Y|
+-------+---+---------+-----+

