In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.1/spark-3.0.1-bin-hadoop2.7.tgz

In [None]:
!tar xf spark-3.0.1-bin-hadoop2.7.tgz
!pip install -q findspark

In [None]:
import os 
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content//spark-3.0.1-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()


In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import * 

In [None]:
import numpy as np
import pandas as pd
from io import StringIO

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import pyspark
from pyspark.sql import SparkSession

In [None]:
spark = SparkSession.builder.appName('SparkProject').getOrCreate()

In [None]:
spark

## 2. Define the schema for all the dataset and use the schema to read the file as DF.

In [None]:
from pyspark.sql.types import * 

Schema_OD = StructType([
      StructField('Order_channel_code',IntegerType(),True),
      StructField('Order_channel_type',StringType(),True),    
])


Schema_PO = StructType([
      StructField('Retailer_code',IntegerType(),True),
      StructField('product_number',IntegerType(),True),
      StructField('Date',StringType(),True),
      StructField('Quantity',IntegerType(),True)
])


Schema_P = StructType([
      StructField('product_number',IntegerType(),True),
      StructField('Product_line',StringType(),True),
      StructField('Product_type',StringType(),True),
      StructField('Product',StringType(),True),
      StructField('Product_brand',StringType(),True),
      StructField('Product_colour',StringType(),True),
      StructField('Unit_loss',FloatType(),True),
      StructField('Unit_price',FloatType(),True),
])


Schema_S = StructType([
      StructField('Retailer_code',IntegerType(),True),
      StructField('Product_Number',IntegerType(),True),
      StructField('Order_channel_code',IntegerType(),True),
      StructField('Date',StringType(),True),
      StructField('Quantity',IntegerType(),True),
      StructField('Unit_price',FloatType(),True),
      StructField('Unit_sale_price',FloatType(),True),
])

Schema_RD = StructType([
      StructField('Retailer_code',IntegerType(),True),
      StructField('Retailer_name',StringType(),True),
      StructField('Type',StringType(),True),
      StructField('Country',StringType(),True),
])

3. Using Spark SQL read the raw data from the files systems (HDFS), as a separate data
frames by specifying the schema for reading the files. (10 Marks)

In [None]:
OrderChannelsDF = spark.read.format('csv').option('header','true').load('/content/drive/MyDrive/Project/Dataset/Dataset/order_channels.csv')

In [None]:
OrderChannelsDF.show()

In [None]:
OrderChannelsDF.printSchema()

In [None]:
ProductsOrdersDF = spark.read.format('csv').option('header','true').load('/content/drive/My Drive/Project/Dataset/Dataset/products_orders.csv')

In [None]:
ProductsOrdersDF.show(5)

In [None]:
ProductsOrdersDF.printSchema()

In [None]:
ProductsDF = spark.read.format('csv').option('header','true').load('/content/drive/My Drive/Project/Dataset/Dataset/products.csv')

In [None]:
ProductsDF.show(5)

In [None]:
ProductsDF.printSchema()

In [None]:
RetailersDetailsDF = spark.read.format('csv').option('header','true').load('/content/drive/My Drive/Project/Dataset/Dataset/retailers_details.csv')

In [None]:
RetailersDetailsDF.show(5)

In [None]:
RetailersDetailsDF.printSchema()

In [None]:
salesDF = spark.read.format('csv').option('header','true').load('/content/drive/My Drive/Project/Dataset/Dataset/sales.csv')

In [None]:
salesDF.show(5)

In [None]:
salesDF.printSchema()

# 4. Perform a basic analysis of the various data frames and store the results on to the file system
#Basic Analysis include: Select any one DF and show the following
# a. Display the columns names (3 Marks)

In [None]:
salesDF.columns

## #b. Display the datatypes of the columns (3 Marks)

In [None]:
salesDF.dtypes

#c. Find the maximum and minimum values in each column (3 Marks)

In [None]:
salesDF.select(max('Unit_price'),max('Unit_sale_price')).show()

In [None]:
salesDF.select(min('Unit_price'),min('Unit_sale_price')).show()

## #d. Define a table/view on the spark dataframe created to run sql queries on the dataframe. (5 Marks)

In [None]:
salesDF.createOrReplaceTempView('sales_temp')
sales_temp1=spark.sql("SELECT * FROM sales_temp")
sales_temp1.show()

#d. Define a table/view on the spark dataframe created to run sql queries on the dataframe. (5 Marks)

In [None]:
sales=salesDF.toPandas()

In [None]:
sales


In [None]:
sales.isnull().sum()

# Analyze the datasets by merging various data frames. Try to find the answers
## for the questions for example:
# f. How many orders chose a particular channel? (10 marks)

In [None]:
salesDF.groupBy('Order_channel_code').agg({'Quantity':'sum'}).show()

## g. Show country wise retailer names. (10 marks)

In [None]:
Retailer_sorted1=RetailersDetailsDF.select('Country','Retailer_name').show()

## h. Number of products available under each project type.

In [None]:
ProductsDF.groupBy('Product_type').agg({'Product':'count'}).show()

## i. Find minimum and maximum unit_price for product_type. (5 marks)

In [None]:
from pyspark.sql import functions as F
ProductsDF.groupBy('Product_type').agg(F.max(ProductsDF.Unit_price),F.min(ProductsDF.Unit_price)).show()