In [1]:
import os
import time
from pyspark.sql.utils import AnalysisException
from pyspark.sql.dataframe import DataFrame
from pyspark.sql import DataFrame, SparkSession
from pyspark.sql.functions import size, lit, explode, col, round, dense_rank, rank, desc
from pyspark.sql.window import Window
import json
from functools import reduce

In [2]:
def get_json_file(file):
    print(f'\n\x1b[1;33;40mRead {file} Json Data...\x1b[0m\n')
    json_file = pyspark_session.read.option("multiline", "true").json(file)
    return json_file

def get_csv_file(file, sep=None):
    print(f'\n\x1b[1;33;40mRead {file} Csv Data...\x1b[0m\n')
    csv_file  = pyspark_session.read.csv(file, header = True, sep = sep)
    return csv_file


def first_dataframe(file):
    data = get_json_file(file)
    rdd  = data.rdd.map(lambda x: (x[0]["site_id"], x[0]["id"] , x[0]["nickname"] , x[0]["points"]))  
    data = rdd.toDF(["siteId", "sellerId", "sellerNickname", "sellerPoints"])
    first_save_file(data)
    return data
    
def first_save_file(data):
    archivos = ['positivo', 'cero', 'negativo']
    for archivo in archivos:
        folder   = f"MPE/2022/10/29/{archivo}"
        if not os.path.exists(folder):
            os.makedirs(folder)
        file_csv = f"MPE/2022/10/29/{archivo}/{archivo}.csv"
        if archivo == 'positivo':
            dataframe = data.filter((data.sellerPoints > 0))
        if archivo == 'cero':
            dataframe = data.filter((data.sellerPoints == 0))
        if archivo == 'negativo':
            dataframe = data.filter((data.sellerPoints < 0))
        dataframe.toPandas().to_csv(file_csv, index=False)


def second_dataframe(file, columns=['rowId','itemId', 'soldQuantity', 'availableQuantity']):
    data = get_json_file(file)
    longitud = data.select(size(data.results)).collect()[0][0]
    total_values = []
    for i in range(0, longitud):
        count = i+1
        rdd = data.rdd.map(lambda x: (count, \
                                      x["results"][i]['id'], \
                                      x["results"][i]['sold_quantity'], \
                                      x["results"][i]['available_quantity'])) 
        values = rdd.collect()[0]
        total_values.append(values)
    data = pyspark_session.createDataFrame(data=total_values, schema = columns)
    return data


def third_dataframe(data_2, file):
    data_3 = get_csv_file(file)
    join_data = data_2.join(data_3, data_2.itemId ==  data_3.itemId,"inner") \
                     .select(data_2.itemId, data_2.soldQuantity,data_3.visits) \
                     .filter((data_2.soldQuantity > 0))
    return join_data


def forth_dataframe(data_3):
    data = data_3.withColumn("conversionRate", round(data_3.soldQuantity / data_3.visits,4)) \
               .sort(col("conversionRate").desc()) \
               .withColumn("conversionRanking", dense_rank().over(Window.orderBy(desc('conversionRate'))))
    return data


def fifth_dataframe(data_2):
    total_available = data_2.agg({'availableQuantity': 'sum'}).collect()[0][0]
    data = data_2.withColumn("stockPercentage", round((data_2.availableQuantity / total_available) * 100 , 2)) \
                .sort(col("stockPercentage").desc()) \
                .select("itemId","availableQuantity","stockPercentage")
    return data


In [3]:
pyspark_session = SparkSession \
                .builder \
                .appName("spark_session") \
                .config('spark.driver.memory', '6g') \
                .getOrCreate()

sparkContext = pyspark_session.sparkContext

In [4]:
df_1 = first_dataframe("jsonfiles/Sellers.json")
df_1.show(5)


[1;33;40mRead Json Data...[0m

+------+---------+---------------+------------+
|siteId| sellerId| sellerNickname|sellerPoints|
+------+---------+---------------+------------+
|   MPE|298734964| MARIELATAQUIRE|           2|
|   MPE|183049329|    MURO8709951|          -3|
|   MPE| 94592189|     ILLARYPERU|          -2|
|   MPE|520133997|ISABELLADELPOZO|           1|
|   MPE|684964436|    PHMO1747353|           0|
+------+---------+---------------+------------+
only showing top 5 rows



In [5]:
df_2 = second_dataframe("jsonfiles/MPE1004.json")
df_2.show(5)


[1;33;40mRead Json Data...[0m

+-----+------------+------------+-----------------+
|rowId|      itemId|soldQuantity|availableQuantity|
+-----+------------+------------+-----------------+
|    1|MPE433108265|           6|                9|
|    2|MPE434382765|           6|                3|
|    3|MPE433853177|           3|               17|
|    4|MPE419883282|          15|               18|
|    5|MPE431714651|          15|                1|
+-----+------------+------------+-----------------+
only showing top 5 rows



In [6]:
df_3 = third_dataframe(df_2, 'csvfiles/visits.csv')
df_3.show(5)


[1;33;40mRead Csv Data...[0m

+------------+------------+------+
|      itemId|soldQuantity|visits|
+------------+------------+------+
|MPE433108265|           6|   203|
|MPE434382765|           6|   170|
|MPE433853177|           3|  1034|
|MPE419883282|          15|  1772|
|MPE431714651|          15|    33|
+------------+------------+------+
only showing top 5 rows



In [7]:
df_4 = forth_dataframe(df_3)
df_4.show(5)

+------------+------------+------+--------------+-----------------+
|      itemId|soldQuantity|visits|conversionRate|conversionRanking|
+------------+------------+------+--------------+-----------------+
|MPE431714651|          15|    33|        0.4545|                1|
|MPE432291284|           2|     6|        0.3333|                2|
|MPE427140390|          10|    81|        0.1235|                3|
|MPE432439269|           2|    42|        0.0476|                4|
|MPE434382765|           6|   170|        0.0353|                5|
+------------+------------+------+--------------+-----------------+
only showing top 5 rows



In [8]:
df_5 = fifth_dataframe(df_2)
df_5.show(5)

+------------+-----------------+---------------+
|      itemId|availableQuantity|stockPercentage|
+------------+-----------------+---------------+
|MPE433046443|              999|           70.3|
|MPE438492919|              100|           7.04|
|MPE436649728|              100|           7.04|
|MPE429448587|               50|           3.52|
|MPE431446248|               23|           1.62|
+------------+-----------------+---------------+
only showing top 5 rows



In [9]:
pyspark_session.stop()