#### Importación de librerias y variables globales como spark session, schema del fichero, o ruta de donde se lee el fichero json

In [None]:
import pandas as pd
import os
import pyspark
from pyspark.conf import SparkConf
from pyspark.sql.types import StructField, StructType, StringType, IntegerType
from pyspark.sql import SQLContext
from pyspark.context import SparkContext
spark = SparkSession.builder.config(conf=SparkConf()).getOrCreate()
schema = StructType([
    StructField('date', StringType(), True),
    StructField('price', IntegerType(), True),
    StructField('product_name', StringType(), True),
    StructField('purchased', StringType(), True),
    StructField('user_id', StringType(), True)
])
sc = SparkContext()
sqlContext = SQLContext(spark)
ruta_json = 'C://Users/ihuerta/Desktop/endpoint.json'

#### Método para saber el tamaño de un fichero y así utilizar pandas o spark dependiendo de su volumen

In [94]:
def file_size(route):
    return os.path.getsize(route)

#### Procesamiento con spark

In [221]:
def process_with_spark(spark,route,schema):
    if os.path.exists('./prueba_ckt.csv'):
        h = False
    else:
        h = True
    df = spark.read.json(route,schema)
    df.toPandas().to_csv('prueba_ckt.csv',mode='a', header=h,index= False)
    print('file processed with pyspark and include in the csv file')
    

#### Procesamiento con pandas

In [222]:
def process_with_pandas(route):
    if os.path.exists('./prueba_ckt.csv'):
        h = False
    else:
        h = True
    df = pd.read_json(route)
    df.to_csv('prueba_ckt.csv', mode='a', header=h,index= False)
    print('file processed with pandas and include in the csv file')
    
    

#### Si el fichero es mayor de 0.5 GigaBytes se procesará con spark ya que es un tamaño decente para que entre spark en acción.

In [220]:
def main(route,spark,schema):
    if file_size(route) > 50000000:
        process_with_spark(spark,route,schema)
    else:
        process_with_pandas(route)
        

#### lectura del fichero con spark ya que irá creciendo  y gracias a su SQL se pueden hacer consultas sencillas y bastante rápidas

In [225]:
def query_questions(route,spark,user,date,date2):
    df = spark.read.format("csv").option("header",True).load(route)
    df.createOrReplaceTempView("people")
    # saber que compras que ha realizado un usuario
    print(f"El usuario {user} ha realizado estas compras:")
    spark.sql(f"select * from people where user_id ={user}").show()

    # variación de un producto por fecha
    print(f"producto vendidos en el mes {date}")
    spark.sql(f"select product_name,count(product_name),date from people where date > '{date}' group by product_name,date").show()
    
    # compras por mes en un año
    print(f"productos vendidos entre las fechas {date} y {date2}")
    spark.sql(f"select count(product_name) from people where date > '{date}' and date < '{date2}'",).show()
    

In [223]:
main(ruta_json,spark,schema)

file processed with pandas and include in the csv file


In [226]:
query_questions("prueba_ckt.csv",spark,1,'2021-01-01','2021-12-23')

El usuario 1 ha realizado estas compras:
+-------+----------+------------+-----+---------+
|user_id|      date|product_name|price|purchased|
+-------+----------+------------+-----+---------+
|      1|2020-12-12|  zapatillas|   50|     True|
|      1|2020-10-07|    camiseta|   15|    False|
|      1|2021-12-24|    cinturon|   50|     True|
|      1|2020-12-12|  zapatillas|   50|     True|
|      1|2020-10-07|    camiseta|   15|    False|
|      1|2021-12-24|    cinturon|   50|     True|
|      1|2020-12-12|  zapatillas|   50|     True|
|      1|2020-10-07|    camiseta|   15|    False|
|      1|2021-12-24|    cinturon|   50|     True|
+-------+----------+------------+-----+---------+

producto vendidos en el mes 2021-01-01
+------------+-------------------+----------+
|product_name|count(product_name)|      date|
+------------+-------------------+----------+
|       gorra|                  3|2021-11-01|
|       traje|                  3|2021-02-24|
|    cinturon|                  3|2021-