In [None]:
import matplotlib.pyplot as plt

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles

In [17]:
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
import requests

In [18]:
import tempfile
import csv

In [19]:
# Crea una instancia de GoogleAuth y autentica
gauth = GoogleAuth()
gauth.LocalWebserverAuth()

# Crea una instancia de GoogleDrive
drive = GoogleDrive(gauth)

In [20]:
spark = SparkSession.builder.appName("ReadCSVFromDrive").getOrCreate()
sc = spark.sparkContext

In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
import csv
import tempfile
from pyspark.sql.functions import lit

In [26]:
# ID de la carpeta que deseas listar
folder_id = "1I8bgqZV-LaKKlT8UvCxeobSsT4y4VuFd"

# Lista de archivos en la carpeta
file_list = drive.ListFile({'q': f"'{folder_id}' in parents and trashed=false"}).GetList()

# Definir las columnas del DataFrame vacío
schema = StructType([
    StructField("id", StringType(), True),
    StructField("event_time", TimestampType(), True),
    StructField("event_type", StringType(), True),
    StructField("product_id", StringType(), True),
    StructField("category_id", StringType(), True),
    StructField("category_code", StringType(), True),
    StructField("brand", StringType(), True),
    StructField("price", DoubleType(), True),
    StructField("user_id", StringType(), True),
    StructField("user_session", StringType(), True)
])

#Creo dataframe que contendra a todos los dataframes leidos del drive
df_all = spark.createDataFrame([], schema)

for file in file_list:
    
    if file['mimeType'] == 'text/csv':
        temp_file = tempfile.NamedTemporaryFile(delete=False)
        print(file['originalFilename'])
        try:
            file.GetContentFile(temp_file.name)
            
            # Leer el contenido del archivo CSV y crear un DataFrame de PySpark
            with open(temp_file.name, 'r') as csvfile:
                csv_reader = csv.reader(csvfile)
                # Saltar la primera fila si contiene encabezados
                next(csv_reader)
                
                # Crear una lista de filas para el DataFrame
                rows = [row for row in csv_reader]
                
                # Crear el DataFrame de PySpark
                df = spark.createDataFrame(rows )
                df = df.selectExpr(
                    "_1 as id",
                    "_2 as event_time",
                    "_3 as event_type",
                    "_4 as product_id",
                    "_5 as category_id",
                    "_6 as category_code",
                    "_7 as brand",
                    "_8 as price",
                    "_9 as user_id",
                    "_10 as user_session"
                )
                #Agrego dataframe a lista
                df_all = df_all.union(df) if df_all else df 
                # Muestra el DataFrame resultante
                #df.show()
                    
        finally:
            temp_file.close()
# Detener la sesión de Spark al finalizar
#spark.stop()


2019-Oct.csv
2020-Jan.csv
2019-Dec.csv
2019-Nov.csv


In [28]:
# Contar la cantidad de filas en el DataFrame
num_rows = df_all.count()

# Imprimir el resultado
print("Cantidad de filas:", num_rows)

Cantidad de filas: 2000000


In [29]:
from pyspark.sql.functions import col

In [30]:
# Cuenta la cantidad de ocurrencias de cada marca
#brand_counts = df.groupBy("brand").count()
brand_counts = df.where(col("brand") != '' ).groupBy("brand").count()

# Muestra el resultado
brand_counts.orderBy(col('count').desc()).show(5)


+-------+-----+
|  brand|count|
+-------+-----+
|samsung|62074|
|  apple|49605|
| xiaomi|36168|
| huawei|11758|
|lucente| 7634|
+-------+-----+
only showing top 5 rows



In [None]:
# Convertir el DataFrame de PySpark a un DataFrame de Pandas para visualización
brand_counts_pandas = brand_counts.toPandas()

# Crear el gráfico de barras
plt.figure(figsize=(10, 6))
plt.bar(brand_counts_pandas['brand'], brand_counts_pandas['count'])
plt.xlabel('Marca')
plt.ylabel('Cantidad')
plt.title('Recuento de Marcas')
plt.xticks(rotation=45)
plt.tight_layout()

# Mostrar el gráfico
plt.show()

In [31]:
event_type_counts = df.where(col("event_type") != '' ).groupBy("event_type").count()

# Muestra el resultado
event_type_counts.orderBy(col('count').desc()).show(3)


+----------+------+
|event_type| count|
+----------+------+
|      view|482642|
|  purchase|  9595|
|      cart|  7763|
+----------+------+



In [32]:
# Cuenta la cantidad de ocurrencias de cada tipo de evento después de filtrar los no nulos
event_type_counts = df.where(col("event_type") != '').groupBy("event_type").count()

# Calcula el total de registros en el DataFrame original
total_records = df.count()

# Agrega una columna que representa el porcentaje
event_type_counts_with_percentage = event_type_counts.withColumn(
    "percentage",
    (col("count") / total_records) * 100
)

# Ordena los resultados por la columna "count" en orden descendente
sorted_event_type_counts = event_type_counts_with_percentage.orderBy(col("count").desc())

# Muestra el resultado ordenado (solo las 3 primeras filas)
sorted_event_type_counts.show(3)

+----------+------+------------------+
|event_type| count|        percentage|
+----------+------+------------------+
|      view|482642|           96.5284|
|  purchase|  9595|1.9189999999999998|
|      cart|  7763|            1.5526|
+----------+------+------------------+



In [33]:
user_id_counts = df.where(col("user_id") != '' ).count()

# Muestra el resultado
print(user_id_counts)

500000


In [34]:
from pyspark.sql import functions as F

purchase_events = df.filter(df.event_type == "purchase")

user_purchase_counts = purchase_events.groupBy("user_id").agg(
    F.count("event_type").alias("purchase_count")
)

user_purchase_counts.show()


+-----------+--------------+
|    user_id|purchase_count|
+-----------+--------------+
|521734903.0|             1|
|564133858.0|             5|
|513114259.0|             1|
|516995448.0|             1|
|512691830.0|             1|
|558710419.0|             1|
|566302098.0|             2|
|521793571.0|             5|
|553458480.0|             1|
|523955361.0|             2|
|518956507.0|             1|
|562067899.0|             1|
|533189542.0|             1|
|519048208.0|             1|
|554459781.0|             1|
|547422663.0|             1|
|565824597.0|             1|
|543455415.0|             1|
|515412653.0|             1|
|518277746.0|             1|
+-----------+--------------+
only showing top 20 rows



In [35]:
from pyspark.sql.functions import datediff, min, expr

first_purchase_dates = purchase_events.groupBy("user_id").agg(min("event_time").alias("first_purchase"))
user_retention = first_purchase_dates.join(purchase_events, on="user_id").select(
    "user_id",
    (datediff("event_time", "first_purchase") / 30).cast("int").alias("months_since_first_purchase")
).distinct()

retention_counts = user_retention.groupBy("months_since_first_purchase").agg(
    F.count("user_id").alias("retained_users")
)

retention_counts.show()

+---------------------------+--------------+
|months_since_first_purchase|retained_users|
+---------------------------+--------------+
|                          0|          7389|
+---------------------------+--------------+

