In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DateType, TimestampType
from pyspark.sql.functions import *
import os

spark = SparkSession.builder.appName("StaticDataFrame").getOrCreate()



In [2]:
# Create a sales_df static dataframe from the sales.csv file.

sales_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("seller_id", IntegerType(), True),
    StructField("date", DateType(), True),   
    StructField("num_pieces_sold", IntegerType(), True),
    StructField("bill_raw_text", StringType(), True),
])

sales_df = spark.read.format("csv") \
                 .option("header", True) \
                 .schema(sales_schema) \
                 .load("sales.csv")

In [3]:
# Verify that count of sales_df > 4 million records.
if sales_df.count() > 4000000:
    print(sales_df.count())
else:
    print("Count < 4 000 000")

4000040


In [4]:
# Create a separate static dataframe for all sales of the seller with id=7.

sales_seller_7_df = sales_df.filter(sales_df.seller_id == 7)
sales_seller_7_df.show()

+--------+----------+---------+----------+---------------+--------------------+
|order_id|product_id|seller_id|      date|num_pieces_sold|       bill_raw_text|
+--------+----------+---------+----------+---------------+--------------------+
|   95016|     10292|        7|2020-07-07|             86|ddjxhepqkpwionxlw...|
|   95023|     13642|        7|2020-07-03|              5|hqlmpvnyluuvdxNyl...|
|   95042|     54476|        7|2020-07-10|             75|frecsxijhiwsujzuu...|
|   95065|     86757|        7|2020-07-04|             95|dxrsOgtaxcxcpulcc...|
|   95067|      9749|        7|2020-07-06|             83|unhtoxghdrgvvtemo...|
|   95070|     88175|        7|2020-07-09|             32|azfydzqrzsftwhzqy...|
|   95075|     36251|        7|2020-07-09|             56|rrezcdybsktrdfuvr...|
|   95089|     73423|        7|2020-07-10|             15|noqsnpzdtiobzqqlu...|
|   95092|     57982|        7|2020-07-06|             94|tkaviwhjZlzrtviqq...|
|   95096|      3694|        7|2020-07-0

In [5]:
# Create the empty “input_data” folder.

if not os.path.exists("input_data"):
    os.makedirs("input_data")

In [6]:
# Create an input streaming dataframe for csv files in the input_data folder. 
# Make a deduplication by the “order_id” column. 
# Apply an aggregation function to calculate sum of the num_pieces_sold. 

input_streaming_df = spark.readStream.format("csv") \
                        .option("header", True) \
                        .schema(sales_schema) \
                        .option("maxFilesPerTrigger", 1) \
                        .load("input_data") \
						.dropDuplicates(["order_id"]) \
                        .withColumn("date", col("date").cast(TimestampType())) \
                        .withWatermark("date", "1 day") \
                        .groupBy("date") \
                        .agg(sum("num_pieces_sold").alias("total_num_pieces_sold"))

# Verify that the created object is streaming.
print(input_streaming_df.isStreaming)

True


In [7]:
# Create an output csv streaming sink to the output_data folder. 
# Apply the partitioning by date. Add the 10 seconds micro-batch interval. 

query = input_streaming_df.writeStream \
            .outputMode("append") \
            .format("csv") \
            .option("path", "output_data") \
            .option("checkpointLocation", "checkpoint_location") \
            .partitionBy("date") \
            .trigger(processingTime="10 seconds") \
            .start()
# Get the current status of the object.
print(query.status)


{'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False}


In [8]:
# Write a static dataframe from the task 1.3 to the “input_data” folder.

sales_seller_7_df.write.mode('overwrite').format("csv") \
                    .option("header", True) \
                    .option("path", "input_data/") \
                    .save()

NameError: name 'avg_temperature_df' is not defined