In [1]:
from pyspark import SparkContext

In [2]:
#Reading Without Schema(User defined)
df=spark.read.csv("/Users/gautam/Desktop/PySparkExamples/Data/test1.csv",header=True)

In [3]:
df.show()

+------------+------+-----+----+
|BusinessDate|Burger|Fries|Coke|
+------------+------+-----+----+
|  2020-12-04|   110|   90|  50|
+------------+------+-----+----+



In [4]:
df.printSchema()

root
 |-- BusinessDate: string (nullable = true)
 |-- Burger: string (nullable = true)
 |-- Fries: string (nullable = true)
 |-- Coke: string (nullable = true)



In [5]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType, DateType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains

In [6]:
schema = StructType() \
      .add("BusinessDate",DateType(),True) \
      .add("Burger",DoubleType(),True) \
      .add("Fries",DoubleType(),True) \
      .add("Coke",DoubleType(),True)

In [7]:
#Reading with schema
df_with_schema=spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("file:///Users/gautam/Desktop/PySparkExamples/Data/test1.csv")

In [8]:
df_with_schema.show()

+------------+------+-----+----+
|BusinessDate|Burger|Fries|Coke|
+------------+------+-----+----+
|  2020-12-04| 110.0| 90.0|50.0|
+------------+------+-----+----+



In [9]:
df_with_schema.printSchema()

root
 |-- BusinessDate: date (nullable = true)
 |-- Burger: double (nullable = true)
 |-- Fries: double (nullable = true)
 |-- Coke: double (nullable = true)



In [10]:
import pyspark.sql.functions as F

In [11]:
#To add new column to the same dataframe with the logic applied
df_final = df_with_schema.withColumn("TotalSales", F.col("Burger")+F.col("Fries")+F.col("Coke"))

In [12]:
df_final.show()

+------------+------+-----+----+----------+
|BusinessDate|Burger|Fries|Coke|TotalSales|
+------------+------+-----+----+----------+
|  2020-12-04| 110.0| 90.0|50.0|     250.0|
+------------+------+-----+----+----------+



In [13]:
df_final_to_csv=df_final.drop('Burger','Fries','Coke')

In [14]:
df_final_to_csv.show()

+------------+----------+
|BusinessDate|TotalSales|
+------------+----------+
|  2020-12-04|     250.0|
+------------+----------+



In [15]:
df_final_to_csv.printSchema()

root
 |-- BusinessDate: date (nullable = true)
 |-- TotalSales: double (nullable = true)



In [16]:
df_final_to_csv.repartition(1).write.option("header",True).mode("overwrite") \
 .csv("file:///Users/gautam/Desktop/PySparkExamples/Output/testoutput")

In [17]:
####This use case is completed taking one file and getting the desired output

In [18]:
#Taking all the files in a directory and making use of same logic and packages as above

In [19]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,DateType 
from pyspark.sql.types import ArrayType, DoubleType, BooleanType
from pyspark.sql.functions import col,array_contains
import pyspark.sql.functions as F

schema = StructType() \
      .add("BusinessDate",DateType(),True) \
      .add("Burger",DoubleType(),True) \
      .add("Fries",DoubleType(),True) \
      .add("Coke",DoubleType(),True)

#Reading with schema
df_with_schema=spark.read.format("csv") \
      .option("header", True) \
      .schema(schema) \
      .load("/Users/gautam/Desktop/PySparkExamples/Data/")

#To add new column to the same dataframe with the logic applied
df_final = df_with_schema.withColumn("TotalSales", F.col("Burger")+F.col("Fries")+F.col("Coke"))
df_final_to_csv=df_final.drop('Burger','Fries','Coke')
df_final_to_csv.coalesce(1).write.option("header",True).mode("overwrite") \
 .csv("file:///Users/gautam/Desktop/PySparkExamples/Output/testoutput")

In [20]:
df_with_schema.printSchema()

root
 |-- BusinessDate: date (nullable = true)
 |-- Burger: double (nullable = true)
 |-- Fries: double (nullable = true)
 |-- Coke: double (nullable = true)



In [21]:
df_final.show()

+------------+------+-----+----+----------+
|BusinessDate|Burger|Fries|Coke|TotalSales|
+------------+------+-----+----+----------+
|  2020-12-04| 150.0|100.0|50.0|     300.0|
|  2020-12-04| 110.0| 90.0|50.0|     250.0|
|  2020-12-03|  50.0| 10.0|50.0|     110.0|
|  2020-12-04|  30.0| 10.0|50.0|      90.0|
+------------+------+-----+----+----------+



In [22]:
df_final_to_csv.show()

+------------+----------+
|BusinessDate|TotalSales|
+------------+----------+
|  2020-12-04|     300.0|
|  2020-12-04|     250.0|
|  2020-12-03|     110.0|
|  2020-12-04|      90.0|
+------------+----------+



In [23]:
#To group the output based on business date and calculate the sales for a particular day
grouped_data=df_final_to_csv.groupBy('BusinessDate')

In [24]:
df_new=grouped_data.sum('TotalSales')

In [25]:
df_new.show()

+------------+---------------+
|BusinessDate|sum(TotalSales)|
+------------+---------------+
|  2020-12-03|          110.0|
|  2020-12-04|          640.0|
+------------+---------------+

