In [None]:
API_PREFIX = "https://merkle-de-interview-case-study.s3.eu-central-1.amazonaws.com/de/"
API_SUFFIX = ".csv"
NAME_ARRAY = ["user","order","item","event"]
PATH = "C:\\Users\\GANESH\\Desktop\\pyspark\\API_FETCH_DATA\\"
API = "http://web.cs.wpi.edu/~cs1004/a16/Resources/SacramentoRealEstateTransactions"
S3_PATH = "s3a://spark/csv/"

### Dendencies

In [None]:
import requests
import pyspark
from pyspark.sql.functions import udf, col, explode
from pyspark.sql import SparkSession

### Download Data From API

In [None]:
# Make API request, get response object back, create dataframe from above schema.
res=None
for name in NAME_ARRAY:
    try:
    #       res = requests.request('get', API_PREFIX + name + API_SUFFIX)
        res = requests.request('get', API+API_SUFFIX)

    except Exception as e:
        print("e") 

    if res != None and res.status_code == 200:
        with open(PATH+name+API_SUFFIX, "wb")as file:
            for line in res.iter_lines(delimiter=b'\\n'):
                file.write(line)
                print(f"SUCCESSFULLY DOWNLOAD: {name+API_SUFFIX}")  

### Create Spark Session

In [None]:
spark = SparkSession.builder.config('spark.app.name','s3app').\
config('spark.jars.packages','org.apache.hadoop:hadoop-aws:2.7.3,org.apache.hadoop:hadoop-common:2.7.3').\
getOrCreate()
print("SparkSession Created successfully")

In [None]:
#spark configuration
sc=spark.sparkContext
sc

### HADOOP Configuration

In [None]:
hadoopConf = sc._jsc.hadoopConfiguration()
hadoopConf.set('spark.hadoop.fs.s3a.access.key', 'access_key')
hadoopConf.set('spark.hadoop.fs.s3a.secret.key', 'secret_key')
hadoopConf.set('spark.hadoop.fs.s3a.endpoint', 's3-us-east-2.amazonaws.com')
hadoopConf.set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')

### Read Data From S3 Bucket

In [None]:
s3_df=spark.read.csv("s3a://bucket_name/dummy.csv",header=True,inferSchema=True)
print(s3_df.show())

### Load Data to S3 Bucket

In [None]:
#1
for name in NAME_ARRAY:
    spark.read.format('csv').options(header='true', inferSchema='true').load(S3_PATH+name+API_SUFFIX)

    
#2
for name in NAME_ARRAY:
df2.write.options("header","true").csv(S3_PATH+name)


In [None]:
spark.stop()
print("SparkSession stopped")

### LAYER 1

***Contains external tables for all prerequisite files.\
All attributes are of STRING type. No transformations are applied***

In [None]:
CREATE TABLE event_raw(
     event_id  STRING,
     event_time STRING,
     user_id STRING,
     event.payload STRING
                     );
                 
CREATE TABLE user_raw(
    created_at STRING,
    deleted_at STRING,
    email_address STRING,
    first_name STRING,
    id STRING,
    last_name STRING,
    merged_at STRING,
    parent_user_id STRING
                     );

CREATE TABLE order_raw(            
    InvoiceId STRING,
    LineItemId STRING,
    UserId STRING,
    ItemId STRING,
    ItemName STRING,
    ItemCategory STRING,
    Price STRING,
    CreatedAt STRING,
    PaidAt STRING
                 );
                 
CREATE TABLE item_raw(                
    adjective STRING,
    category STRING,
    created_at STRING,
    id STRING,
    modifier STRING,
    name STRING,
    price STRING  
                 );            

### LAYER 2

***▪ Contains all datasets from the first layer \
▪ All attributes have common naming convention \
▪ All attributes have proper datatypes based on the attribute name and
common logic\
▪ All struct collection attributes are flattened and transformed to proper data
types\
▪ Fact tables are properly partitioned based on meaningful attributes***

In [None]:
# Needed to Check
# expload at event.payload
# {"event_name":"view_item","platform":"android","parameter_name":"item_id","parameter_value":"3526"}
# Fact table is needed to create  
    

CREATE TABLE event_raw_input AS(
    
     SELECT 
             event_id STRING,
             CAST(event_time AS TIMESTAMP(‘MM/DD/YYYY HH:MI:ss) ) event_time,
             CAST(user_id AS UNSIGNED ) user_id, 
             CAST expload(event.payload AS STRING) event,
             CAST(event.event_name.value AS STRING) event_name,
             CAST(event.platform.value AS STRING) platform,
             CAST(event.parameter_name.value AS STRING) parameter_name,
             CAST(event.parameter_value.value AS STRING) parameter_value,
             
    FROM event_raw
                              );
                 
CREATE TABLE user_raw_input AS(
    
    SELECT
        CAST(created_at AS  TIMESTAMP(‘MM/DD/YYYY HH:MI:ss)) created_at ,
        CAST(deleted_at AS  TIMESTAMP(‘MM/DD/YYYY HH:MI:ss) ) deleted_at,
        email_address STRING,
        first_name STRING,
        CAST(id AS UNSIGNED) user_id, 
        last_name STRING,
        CAST(merged_at AS  TIMESTAMP(‘MM/DD/YYYY HH:MI:ss)) merged_at,
        CAST(parent_user_id AS UNSIGNED) parent_user_id
    FROM user_raw 
                     );


CREATE TABLE order_raw_input( 
    
    SELECT
            CAST(InvoiceId AS UNSIGNED) invoice_id,
            CAST(LineItemId AS UNSIGNED) line_item_id,
            CAST(UserId AS UNSIGNED) user_id
            CAST(ItemId AS UNSIGNED) item_id,
            CAST(ItemName AS STRING) item_name,
            CAST(ItemCategory AS STRING) item_category,
            CAST(Price AS FLOAT) price,
            CAST(CreatedAt AS TIMESTAMP(‘MM/DD/YYYY HH:MI:ss)) created_at ,
            CAST(PaidAt AS TIMESTAMP(‘MM/DD/YYYY HH:MI:ss)) paid_at ,
    FROM order_raw
                            );
                 
CREATE TABLE item_raw_input(  
    
    SELECT
            adjective STRING,
            CAST(category AS STRING) item_category,
            CAST(created_at AS TIMESTAMP(‘MM/DD/YYYY HH:MI:ss)) created_at ,
            CAST(id AS UNSIGNED) item_id,
            modifier STRING,
            CAST(name AS STRING) item_name,
           CAST(price AS FLOAT) price 
    FROM item_raw
                            );


### LAYER 3

***▪ Contains following data marts:***

***• “top_items” data mart with all sold items with additional attributes:***\
o For every year (based on the created_at attribute):\
     ▪ Total number of an items sold in a particular year\
     ▪ Rank of an item based on the total number of items sold in a particular year\
     ▪ Total sales from an item in a particular year\
     ▪ Rank of an item based on the total sales in a particular year\
o Total number of items sold in all years\
o Rank of an item based on the total number of solds\
o Total sales of an item in all years\
o Rank of an item based on the total sales\


### For every year

In [None]:
# spark_df.select('item_id','item_name','line_item_id','created_at') where year == 2021

# # ▪ Total number of an items sold in a particular year
# # ▪ Rank of an item based on the total number of items sold in a particular year

# total_sold=spark_df.groupBy(["item_name"]).agg(
#     count("item_name").alias("total_items_sold"))
# rank_of_item = total_sold.sort(desc('rank_of_item')) 

# # ▪ Total sales from an item in a particular year
# # ▪ Rank of an item based on the total sales in a particular year

# total_sales=spark_df.groupBy(["item_name"]).agg(
#    sum("price").alias("total_items_sales"))
# rank_of_item = total_sold.sort(desc('rank_of_item')) 


### For all year 

In [None]:
# spark_df.select('item_id','item_name','line_item_id','created_at') where year == 2021

# # o Total number of items sold in all years
# # o Rank of an item based on the total number of solds


# total_sold=spark_df.groupBy(["item_name"]).agg(
#     count("item_name").alias("total_items_sold"))
# rank_of_item = total_sold.sort(desc('rank_of_item')) 

# # o Total sales of an item in all years
# # o Rank of an item based on the total sales

# total_sales=spark_df.groupBy(["item_name"]).agg(
#    sum("price").alias("total_items_sales"))
# rank_of_item = total_sold.sort(desc('rank_of_item')) 


***• “top_buyers” data mart with top 20 customers who contributed on the total sales the most with additional attributes:***\
o Total sales contributed\
o Rank based on the total sales\
o Last order creation date\
o The overall most viewed item of a customer

In [None]:
# users=spark_df.groupBy(["user_id"]).agg(count("item_id"),sum('price').alias("items_order_by_user"))
# rank_of_item = total_sold.sort(desc('items_order_by_user'))

### Read data from local

In [None]:
df = spark.read.csv(PATH+'user'+API_SUFFIX, inferSchema = True, header = True)
df.show(6,False)
df.printSchema()

In [None]:
df =df
xgysvsg