In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
spark = SparkSession.builder \
  .appName('clean_products') \
  .config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar') \
  .getOrCreate()  

In [2]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [3]:
table = "becade_rgarciaf.stg_compras"
stg_compras = spark.read \
  .format("bigquery") \
  .option("table", table) \
  .load()

stg_compras.show()

+-------------------+-----------------+----------+--------+------+--------------------+-------+-------------------+-----------+
|          compra_id|        client_id|product_id|cantidad|precio|            envio_id|isprime|       fecha_compra|metodo_pago|
+-------------------+-----------------+----------+--------+------+--------------------+-------+-------------------+-----------+
|831-0776824-5815830|831-175061-77-427|B00N69D6AS|       1|236.99|831-68-46270-8408...|   true|2011-12-10 00:00:00|  **** 0012|
|831-2232014-2295491|831-175061-77-427|B00N69D6AS|       1|236.99|831-89-84651-9806...|   true|2011-12-30 00:00:00|  **** 0012|
|831-7168743-3850172|831-175061-77-427|B00N69D6AS|       1|236.99|831-53-86748-7636...|   true|2012-10-27 00:00:00|  **** 0012|
|831-6524226-3868173|831-175061-77-427|B00N69D6AS|       1|236.99|831-01-13591-0848...|   true|2012-11-01 00:00:00|  **** 0012|
|831-7422339-0701440|831-175061-77-427|B00N69D6AS|       1|236.99|831-58-81017-4457...|   true|2012-11-0

In [4]:
from pyspark.sql.functions import year, month, dayofmonth,col
from pyspark.sql.functions import sum, col, desc, avg, count

In [5]:
compras_2 = stg_compras.select("compra_id","client_id","product_id","cantidad","precio","isprime","metodo_pago",
    year("fecha_compra").alias('year'), 
    month("fecha_compra").alias('month'), 
    dayofmonth("fecha_compra").alias('day')
)

In [7]:
################################################## compras_anuales ##################################################

In [8]:
compras_3 = compras_2.withColumn("cantidad",compras_2.cantidad.cast("int"))\
    .withColumn("precio",compras_2.precio.cast("float"))

In [10]:
compras_anuales = compras_3.groupBy("year")\
    .agg(sum("cantidad").alias("venta_total"),count("compra_id").alias("total_compras")) 

In [13]:
compras_promedio_ventas_mensual_anual = compras_3.groupBy("month","year")\
    .agg(sum("cantidad").alias("avg_venta_mensual")) 

In [15]:
compras_promedio_ventas_mensual_anual_2 = compras_promedio_ventas_mensual_anual.groupBy("year")\
    .agg(avg("avg_venta_mensual").alias("avg_venta_mensual")) 

In [26]:
 compras_anuales_2 = compras_anuales.join(compras_promedio_ventas_mensual_anual_2,compras_anuales.year ==  compras_anuales.year,"outer").select(compras_anuales.year, compras_anuales.venta_total,compras_anuales.total_compras,compras_promedio_ventas_mensual_anual_2.avg_venta_mensual)

In [31]:
compras_anuales_2=compras_anuales_2.withColumn("avg_venta_mensual",compras_anuales_2.avg_venta_mensual.cast('int')) \

In [33]:
compras_anuales_2.write \
  .format("bigquery") \
  .option("table","becade_rgarciaf.compras_anuales") \
  .option("temporaryGcsBucket", "amazon_bucket_ramiro") \
  .mode('overwrite') \
  .save()

In [None]:
################################################## compras_mensuales ##################################################

In [119]:
compras_mensuales =  compras_3.groupBy("month","year")\
    .agg(sum("cantidad").alias("venta_total_mes"),count("compra_id").alias("total_compras_mes")) 

In [113]:
import pyspark.sql.functions as func
from pyspark.sql.window import Window
from pyspark.sql.functions import lit

In [125]:
compras_mensuales_2 = compras_mensuales.withColumn('venta_total_mes_anterior',func.lag(compras_mensuales['venta_total_mes']).over(Window.orderBy("month","year")))

In [None]:
compras_mensuales_2.na.fill(0)

In [147]:
compras_mensuales_2.write \
  .format("bigquery") \
  .option("table","becade_rgarciaf.compras_mensuales") \
  .option("temporaryGcsBucket", "amazon_bucket_ramiro") \
  .mode('overwrite') \
  .save()