In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from datetime import datetime

In [3]:
# Create Spark Session
spark = SparkSession.builder\
        .appName("Spark")\
        .enableHiveSupport()\
        .getOrCreate()

In [4]:
# Hardcoded data
data = [
    ["Product A", 1001, datetime.strptime("2023-07-20", "%Y-%m-%d"), datetime.strptime("2023-07-20 10:15:30", "%Y-%m-%d %H:%M:%S"), 29.99],
    ["Product B", 1002, datetime.strptime("2023-07-19", "%Y-%m-%d"), datetime.strptime("2023-07-19 14:20:45", "%Y-%m-%d %H:%M:%S"), 49.99],
    ["Product C", 1003, datetime.strptime("2023-07-18", "%Y-%m-%d"), datetime.strptime("2023-07-18 09:30:15", "%Y-%m-%d %H:%M:%S"), 39.99],
    ["Product D", 1004, datetime.strptime("2023-07-17", "%Y-%m-%d"), datetime.strptime("2023-07-17 16:45:00", "%Y-%m-%d %H:%M:%S"), 19.99]
]

In [5]:
# Define Schema
schema = StructType([
    StructField("Product",StringType(),True),
    StructField("ID",IntegerType(),True),
    StructField("Date",DateType(),True),
    StructField("Timestamp",TimestampType(),True),
    StructField("Price",FloatType(),True)
])

In [7]:
# Create Dataframe
df = spark.createDataFrame(data,schema)

# Print schema
df.printSchema()

# Print data
df.show()

root
 |-- Product: string (nullable = true)
 |-- ID: integer (nullable = true)
 |-- Date: date (nullable = true)
 |-- Timestamp: timestamp (nullable = true)
 |-- Price: float (nullable = true)

+---------+----+----------+-------------------+-----+
|  Product|  ID|      Date|          Timestamp|Price|
+---------+----+----------+-------------------+-----+
|Product A|1001|2023-07-20|2023-07-20 10:15:30|29.99|
|Product B|1002|2023-07-19|2023-07-19 14:20:45|49.99|
|Product C|1003|2023-07-18|2023-07-18 09:30:15|39.99|
|Product D|1004|2023-07-17|2023-07-17 16:45:00|19.99|
+---------+----+----------+-------------------+-----+



In [8]:
# First read example should not infer schema, igonre header row, provide explicit column name and datatype

# Define schema
schema = StructType([
    StructField("order_id",StringType(),True),
    StructField("order_item_id",IntegerType(),True),
    StructField("product_id",StringType(),True),
    StructField("seller_id",StringType(),True),
    StructField("shopping_limit_date",TimestampType(),True),
    StructField("price",DoubleType(),True),
    StructField("freight_value",DoubleType(),True)
])

path = "/content/order_items_dataset.csv"
df = spark.read.format("csv").option("header","true").option("inferSchema","false").schema(schema).load(path)

df.printSchema()

df.show(5)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shopping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shopping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48

In [9]:
# Second read example should infer schema, ignore header row
df2 = spark.read.format("csv").option("header","true").option("inferSchema","true").load(path)

# print schema and sample data
df2.printSchema()
df2.show(5)

root
 |-- order_id: string (nullable = true)
 |-- order_item_id: integer (nullable = true)
 |-- product_id: string (nullable = true)
 |-- seller_id: string (nullable = true)
 |-- shipping_limit_date: timestamp (nullable = true)
 |-- price: double (nullable = true)
 |-- freight_value: double (nullable = true)

+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|            order_id|order_item_id|          product_id|           seller_id|shipping_limit_date|price|freight_value|
+--------------------+-------------+--------------------+--------------------+-------------------+-----+-------------+
|00010242fe8c5a6d1...|            1|4244733e06e7ecb49...|48436dade18ac8b2b...|2017-09-19 09:45:35| 58.9|        13.29|
|00018f77f2f0320c5...|            1|e5f2d52b802189ee6...|dd7ddc04e1b6c2c61...|2017-05-03 11:05:13|239.9|        19.93|
|000229ec398224ef6...|            1|c777355d18b72b67a...|5b51032eddd242adc...|2018-01-18 14:48

In [11]:
# Number of partitions
print(f"Number of partitions: {df2.rdd.getNumPartitions()}")

df3 = df2.repartition(10)

# Number of partitions after repartition
print(f"Number of partitions after repartition: {df3.rdd.getNumPartitions()}")

Number of partitions: 2
Number of partitions after repartition: 10


In [13]:
# Select columns in different options
from pyspark.sql.functions import *

df3.select("order_id").show(5)
df3.select("order_id","shipping_limit_date").show(5)
df3.select(col("order_id"),col("shipping_limit_date")).show(5)
df3.select(col("order_id").alias("ord_id"),col("shipping_limit_date").alias("limit_date")).show(5)

+--------------------+
|            order_id|
+--------------------+
|6299bb8e855289b41...|
|71fbb9971d84bf97a...|
|74322a01b770c2ea3...|
|a23fc2b3af4f1a48e...|
|747af114bbea56ac1...|
+--------------------+
only showing top 5 rows

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|3bbf8f927f288e4a1...|2017-11-09 14:25:38|
|50c40cfcbb6ce3fca...|2018-06-14 09:52:04|
|51c3d73e0e9052253...|2018-02-22 19:15:27|
|183ee0e3ebd4c1c99...|2018-02-07 20:14:08|
|3a1400b5d4dd3082a...|2018-03-27 17:28:20|
+--------------------+-------------------+
only showing top 5 rows

+--------------------+-------------------+
|            order_id|shipping_limit_date|
+--------------------+-------------------+
|3bbf8f927f288e4a1...|2017-11-09 14:25:38|
|50c40cfcbb6ce3fca...|2018-06-14 09:52:04|
|51c3d73e0e9052253...|2018-02-22 19:15:27|
|183ee0e3ebd4c1c99...|2018-02-07 20:14:08|
|3a1400b5d4dd3082a...|2018-03-27 17:28:20|
+-----------

In [14]:
# Derive new column using withColumn
df4 = df3.withColumn("year",year(col("shipping_limit_date")))\
          .withColumn("month",month(col("shipping_limit_date")))

df4.select("order_id","shipping_limit_date","year","month").show(5)

+--------------------+-------------------+----+-----+
|            order_id|shipping_limit_date|year|month|
+--------------------+-------------------+----+-----+
|3bbf8f927f288e4a1...|2017-11-09 14:25:38|2017|   11|
|50c40cfcbb6ce3fca...|2018-06-14 09:52:04|2018|    6|
|51c3d73e0e9052253...|2018-02-22 19:15:27|2018|    2|
|183ee0e3ebd4c1c99...|2018-02-07 20:14:08|2018|    2|
|3a1400b5d4dd3082a...|2018-03-27 17:28:20|2018|    3|
+--------------------+-------------------+----+-----+
only showing top 5 rows



In [15]:
# Rename existing column using withColumnRenamed
df5 = df4.withColumnRenamed("shipping_limit_date","shipping_limit_datetime")
df5.select("order_id","shipping_limit_datetime").show(5)

+--------------------+-----------------------+
|            order_id|shipping_limit_datetime|
+--------------------+-----------------------+
|3bbf8f927f288e4a1...|    2017-11-09 14:25:38|
|50c40cfcbb6ce3fca...|    2018-06-14 09:52:04|
|51c3d73e0e9052253...|    2018-02-22 19:15:27|
|183ee0e3ebd4c1c99...|    2018-02-07 20:14:08|
|3a1400b5d4dd3082a...|    2018-03-27 17:28:20|
+--------------------+-----------------------+
only showing top 5 rows

