In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import TimestampType
import pyspark.sql.functions as F
from pyspark.sql.window import Window
import folium
from folium import plugins
from IPython.display import clear_output
import pandas as pd

spark = SparkSession.builder.getOrCreate()



# Reading data from parquet and selecting only needed columns
# Orders data
items = spark.read.options(header='True', inferSchema='True', delimiter=',') \
    .csv('/home/jovyan/work/EPAM_sales_analysis/data/olist_order_items_dataset.csv')\
    .select('order_id','seller_id','price')

# Sellers data 
sellers = spark.read.options(header='True', inferSchema='True', delimiter=',') \
    .csv('/home/jovyan/work/EPAM_sales_analysis/data/olist_sellers_dataset.csv')\
    .select('seller_id','seller_zip_code_prefix','seller_city','seller_state')\
    .withColumnRenamed('seller_zip_code_prefix','zip_code')

# Geolocation data 
geo = spark.read.options(header='True', inferSchema='True', delimiter=',') \
    .csv('/home/jovyan/work/EPAM_sales_analysis/data/olist_geolocation_dataset.csv')\
    .select('geolocation_zip_code_prefix','geolocation_lat','geolocation_lng')\
    .withColumnRenamed('geolocation_zip_code_prefix','zip_code')\
    .groupBy('zip_code')\
    .agg({'geolocation_lat':'avg','geolocation_lng':'avg'})\
    .withColumnRenamed('avg(geolocation_lat)','lat')\
    .withColumnRenamed('avg(geolocation_lng)','lng')

# Orders data 
orders = spark.read.options(header='True', inferSchema='True', delimiter=',') \
    .csv('/home/jovyan/work/EPAM_sales_analysis/data/olist_orders_dataset.csv')\
    .select('order_id','order_purchase_timestamp')

# |-- order_id: string (nullable = true)
# |-- seller_id: string (nullable = true)
# |-- price: double (nullable = true)

# |-- seller_id: string (nullable = true)
# |-- seller_zip_code_prefix: integer (nullable = true)
# |-- seller_city: string (nullable = true)
# |-- seller_state: string (nullable = true)

# |-- geolocation_zip_code_prefix: integer (nullable = true)
# |-- lat: double (nullable = true)
# |-- lng: double (nullable = true)

# |-- order_id: string (nullable = true)
# |-- order_purchase_timestamp: string (nullable = true)/home/jovyan/work/sales_analysis/



# Calculating sum of money earned by sellers 
# and joining sellers table to be able to calculate 
# money earned partitioning by location 
sales = items\
    .groupBy('seller_id')\
    .agg({'price':'sum'})\
    .withColumnRenamed('sum(price)','revenue')\
    .join(sellers,['seller_id'])


# Top 2 sellers in every state
#using pyspark dataframe

# Partition by state
window_state = Window.partitionBy('seller_state').orderBy(col('revenue').desc())
salesstate_df = sales\
    .withColumn('rank',rank().over(window_state))\
    .filter(col('rank')<=2)\
    .select('seller_id','seller_state','revenue')\
    .orderBy(col('revenue').desc())

# Creating DataFrame containing seller_state, 1_best_seller, 2_best_seller
window_seller_state = Window.partitionBy('seller_state').orderBy(col('revenue').desc())
state_sellers = salesstate_df.withColumn('rank',rank().over(window_seller_state))
best_sellers_state = state_sellers.filter(state_sellers.rank <= 2)\
   .withColumn('col', expr('concat(rank, "_best_seller")'))\
   .groupby('seller_state')\
   .pivot('col')\
   .agg(first(state_sellers.seller_id))\
   .na.fill("In this state is only one seller", ["2_best_seller"])

best_sellers_state.show()

best_sellers_state.write.parquet("/home/jovyan/work/EPAM_sales_analysis/raport/transformed_data/4_task_city_best_sellers_state_df.parquet",mode="overwrite")

# Top 2 sellers in every city
# using pyspark dataframe

# Partition by city
window_state = Window.partitionBy('seller_city').orderBy(col('revenue').desc())
salescity_df = sales\
    .withColumn('rank',rank().over(window_state))\
    .filter(col('rank')<=2)\
    .select('seller_id','seller_city','revenue')\
    .orderBy(col('revenue').desc())


# Creating DataFrame containing seller_city, 1_best_seller, 2_best_seller
window_seller = Window.partitionBy('seller_city').orderBy(col('revenue').desc())
city_sellers = salescity_df.withColumn('rank',rank().over(window_seller))
best_sellers = city_sellers.filter(city_sellers.rank <= 2)\
   .withColumn('col', expr('concat(rank, "_best_seller")'))\
   .groupby('seller_city')\
   .pivot('col')\
   .agg(first(city_sellers.seller_id))\
   .na.fill("In this city is only one seller", ["2_best_seller"])

best_sellers.show()

best_sellers.write.parquet("/home/jovyan/work/EPAM_sales_analysis/raport/transformed_data/4_task_city_best_sellers_df.parquet",mode="overwrite")



# Creating data frame with different locations sum of sales and number of sales 
salesmap_df = sales.join(geo,'zip_code','left').orderBy(col('revenue').desc())
salescity_df = salesmap_df\
    .groupby('seller_city')\
    .agg({'revenue':'sum','lat':'avg','lng':'avg','seller_state':'count'})\
    .orderBy(col('sum(revenue)'))\
    .select('seller_city',
            col('sum(revenue)').alias('revenue'),
            col('count(seller_state)').alias('count'),
            col('avg(lat)').alias('lat'),
            col('avg(lng)').alias('lng'))


salesstate_df = salesmap_df\
    .groupby('seller_state')\
    .agg({'revenue':'sum','lat':'avg','lng':'avg','seller_city':'count'})\
    .orderBy(col('sum(revenue)'))\
    .select('seller_state',
            col('sum(revenue)').alias('revenue'),
            col('count(seller_city)').alias('count'),
            col('avg(lat)').alias('lat'),
            col('avg(lng)').alias('lng'))


# Function for further grouping of our results 
def count_group(x):
    if x>1500:
        return 0
    elif x>200:
        return 1
    elif x>100:
        return 2
    elif x>20:
        return 3
    elif x>10:
        return 4
    else:
        return 5
def sum_group(x):
    if x>8000000:
        return 0
    elif x>1000000:
        return 1
    elif x>500000:
        return 2
    elif x>50000:
        return 3
    elif x>10000:
        return 4
    else:
        return 5
    

colors = ['#CE4C18','#D58321','#D5B421','#C3E839','#43A85F','#66CF83']
sizes = [25,12,8,5,3,1]


rdd = salesstate_df.rdd.map(lambda row: (
      row["seller_state"],row["revenue"],row["count"],row["lat"],row["lng"], sizes[count_group(row["count"])], colors[count_group(row["count"])])
  )
state_df = rdd.toDF(["seller_state","revenue","count","lat", "lng", "size", "colors"])


rdd_2 = salescity_df.rdd.map(lambda row: (
      row["seller_city"],row["revenue"],row["count"],row["lat"],row["lng"], sizes[count_group(row["count"])], colors[count_group(row["count"])])
  )
city_df = rdd_2.toDF(["seller_city","revenue","count","lat", "lng", "size", "colors"])

# Saving files that will be used when creating report
state_df.write.parquet("/home/jovyan/work/EPAM_sales_analysis/raport/transformed_data/6_task_salesstate_df.parquet" \
                            ,mode="overwrite")
city_df.write.parquet("/home/jovyan/work/EPAM_sales_analysis/raport/transformed_data/4_task_salescity_df.parquet" \
                           ,mode="overwrite")

state_df.show()
city_df.show()

22/08/19 13:50:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/19 13:50:57 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
                                                                                

+------------+--------------------+--------------------+
|seller_state|       1_best_seller|       2_best_seller|
+------------+--------------------+--------------------+
|          SC|04308b1ee57b6625f...|eeb6de78f79159600...|
|          RO|3364a91ec4d56c98e...|a5259c149128e82c9...|
|          PI|47efca563408aae19...|In this state is ...|
|          AM|327b89b872c14d1c0...|In this state is ...|
|          GO|9d5a9018aee56acb3...|750303a20e9c56b2a...|
|          MT|2dee2ce60de9709b1...|abcd2cb37d46c2c8f...|
|          SP|4869f7a5dfa277a7d...|4a3ca9315b744ce9f...|
|          ES|001cca7ae9ae17fb1...|33dd941c27854f762...|
|          PB|a6bd7d1ccdac48c6b...|07017df32dc5f2f1d...|
|          RS|87142160b41353c4e...|b32be1695eb7ec5f1...|
|          MS|b1fecf4da1fa2689b...|9c068d10aca38e85c...|
|          MG|a1043bafd471dff53...|25c5c91f63607446a...|
|          PA|67225bff54a172ff6...|In this state is ...|
|          BA|53243585a1d6dc264...|c72de06d72748d1a0...|
|          SE|4b39558c138930b9e

22/08/19 13:51:32 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
                                                                                

+--------------------+--------------------+--------------------+
|         seller_city|       1_best_seller|       2_best_seller|
+--------------------+--------------------+--------------------+
|           igrejinha|da20530872245d6cd...|In this city is o...|
|             brusque|c33847515fa6305ce...|ad97a199236354e53...|
|            buritama|2c4c47cb51acd5ea5...|In this city is o...|
|         carapicuiba|f181738b150df1f37...|f680f85bee2d25355...|
|    fernando prestes|60da8bfa7eebe230b...|In this city is o...|
|               garca|527801b552d0077ff...|c12b92bf1c350f3e6...|
|             ipaussu|e333046ce6517bd8b...|In this city is o...|
|  sao joao de meriti|3c03b12bab54d8b37...|117cfc326c6d50da6...|
|              araras|45213867cefbf2cd4...|31da954dc0855f249...|
|           jacutinga|7a241947449cc45db...|198c7ea11960a9844...|
|       nova friburgo|2bf6a2c1e71bbd29a...|c26a2be5b53b7db6b...|
| sao pedro da aldeia|da6a60cc8cc724fe5...|In this city is o...|
|itapecerica da serra|c9c