In [6]:
from pyspark.sql.functions import split, explode, col, regexp_replace, regexp_extract_all,  expr, array_join, trim, regexp_extract, concat_ws
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pyspark
import collections
import re
from pyspark.sql.types import IntegerType, DateType, StringType, StructType, StructField

# Crear una sesión de Spark
spark = SparkSession.builder.appName("EjemploPySpark").getOrCreate()

# Ruta al archivo CSV
file = 'data_user_reels_claudianicolasa.csv'

lines = spark.sparkContext.textFile(file)

'''
El csv tiene el problema de que un campo esta ocupando una o varias filas del csv
es_incompleta y unir_lineas_incompletas son dos metodos para solventar este problema
es_incompleta para comprobar si el campo usa mas de una linea
unir_lineas_incompletas para joinear esas lineas
'''

# Función para eliminar comas dentro de comillas dobles en una línea CSV
def eliminar_comas_entre_comillas(line):
    # Usamos una expresión regular que encuentra cualquier cosa entre comillas dobles
    # y luego eliminamos las comas que están dentro de ese texto
    return re.sub(r'\"(.*?)\"', lambda match: match.group(0).replace(",", ""), line)


# Función para verificar si la línea está incompleta
def es_incompleta(line):
    # Verifica si la línea contiene un número impar de comillas dobles
    return line.count('"') % 2 != 0

# Acumulamos las líneas hasta que tengamos una línea completa (con un número par de comillas)
def unir_lineas_incompletas(rdd):
    acumulado = []
    resultados = []

    for line in rdd.collect():
        acumulado.append(line)
        # Si la línea completa el conjunto de comillas
        if not es_incompleta(" ".join(acumulado)):
            # Unimos las líneas acumuladas y las añadimos al resultado
            resultados.append(" ".join(acumulado))
            acumulado = []
    
    return resultados

# Unir las líneas mal formadas en el RDD
lineas_corregidas = spark.sparkContext.parallelize(unir_lineas_incompletas(lines))


# Aplicamos la función a cada línea corregida para eliminar las comas entre comillas
rdd_sin_comas = lineas_corregidas.map(eliminar_comas_entre_comillas)

# Ahora convertimos cada línea en una lista de columnas usando split por comas
rdd_estructurado = rdd_sin_comas.map(lambda line: line.split(","))


# Creamos el schema del dataframe

esquema = StructType([
    StructField("id", StringType(), True),
    StructField("comentario", StringType(), True),
    StructField("like", StringType(), True),
    StructField("fecha", StringType(), False),
    StructField("tipopublicacion", StringType(), True),
    StructField("oldvisualizaciones", StringType(), True),
    StructField("visualizaciones", StringType(), True),
    StructField("titulo", StringType(), False)
])

     
# Convertimos el RDD en DataFrame usando toDF y asignamos nombres de columnas
df = spark.createDataFrame(rdd_estructurado, schema=esquema)

#df.printSchema()

#PROCEDEMOS A TRANSFORMAR ALGUNAS COLUMNAS

#Creamos una sola columna para las views
df = df.withColumn("totalViews", df.oldvisualizaciones + df.visualizaciones)

#En cuanto al titulo lo dividimos en la parte de hastag y el resto
df = df.withColumn("titulopublicacion", split(df["titulo"], "#").getItem(0)) 
df = df.withColumn("hashtagtodos", regexp_replace(df["titulo"], df["titulopublicacion"], "-"))
#df.select("hashtagtodos").show(truncate=False)
#df.select("titulopublicacion").show(truncate=False)
    #.withColumn("hastag2", split(df["titulo"], "#").getItem(1))\
      
#creamos nuevas columnas en funcion de la fecha de publicacion -> año, mes, dia_semana, hora
df = df.withColumn("fecha_anio", split(df["fecha"], "-").getItem(0)) 
df = df.withColumn("fecha_mes", split(df["fecha"], "-").getItem(1)) 
df = df.withColumn("fecha_hora", split(df["fecha"], " ").getItem(1)) 

df = df.withColumn("fecha_timestamp", F.to_timestamp(F.col("fecha"), "yyyy-MM-dd HH:mm:ssXXX"))
df = df.withColumn("fecha_hora_dia_semana", F.date_format(F.col("fecha_timestamp"), "EEEE"))

#Creamos dos nuevas columnas para el engagement por like y por comentario 
df = df.withColumn("engagement_like", df.like / df.totalViews)
df = df.withColumn("engagement_comentario", df.comentario / df.totalViews)
#df.select("engagement_like", "engagement_comentario").show(truncate=False)

#Eliminamos todas las filas que sus campos son todos nulos
df.na.drop()


#ANALISIS

df.groupBy("fecha_anio").agg({"totalViews":"sum","like":"sum", "comentario":"sum", "id": "count" }).orderBy("fecha_anio", ascending=False).show()
df.groupBy("fecha_mes").agg({"totalViews":"sum","like":"sum", "comentario":"sum", "id": "count"  }).orderBy("fecha_mes").show()
df.groupBy("fecha_hora_dia_semana").agg({"totalViews":"sum","like":"sum", "comentario":"sum", "id": "count"  }).orderBy("fecha_hora_dia_semana").show()
df.groupBy("fecha_hora_dia_semana").pivot("fecha_anio").count().orderBy("fecha_hora_dia_semana").show()




AttributeError: 'DataFrame' object has no attribute 'oldvisualizaciones'