In [None]:
import os
import pyspark.sql.functions as F
import dotenv
from gcpspark import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_date, date_sub, col
import time

In [None]:
ENV = os.environ["ENVIRONMENT"]

In [None]:
# from pyspark.sql import SparkSession
properties={"accesskey":os.getenv('colombia_sm_aws_access_key_id'),"secretkey":os.getenv('colombia_sm_aws_secret_access_key'),"sas_url":"","sas":""}
spark_name = "data-extraction-br"
spark = create_pyspark(name = spark_name , connection="s3", properties=properties, verbose=True)

In [None]:
def transformation_stock(df):
    return df.select("tienda_stock","sku_stock","ean_stock","fecha_stock","unidades_stock").filter(F.col("fecha_stock") == F.date_sub(F.current_date(), 1)).filter(F.col("unidades_stock")>0)
    
def transformation_active(df):
     return df.filter(F.col("estado_producto") == "S").drop("estado_producto")

In [5]:
bucket_name = 'cencosud.prod.sandbox.sm.col'
# s3://cencosud.prod.sandbox.sm.col/ANALYTICS/ARCUS/
# s3://cencosud.prod.super.peru.analytics/ExternalAccess/arcus_smk_pe/automatico/vw_milocal_fact_stock_mat
sources = [    
    {
        "prefix":"ANALYTICS/ARCUS/Maestra_Activa_Colombia*",
        "bucket":f"{ENV}-bucket-dataproc-bigquery/dataproc/co_extraction",
        "table":"data_analytics_default.BQ_CO_SM_ACTIVE_01",
        "mode":"overwrite",
        "transformation":transformation_active
    },
    {
        "prefix":"ANALYTICS/ARCUS/Maestra_Canal_Ventas_Colombia*",
        "bucket":f"{ENV}-bucket-dataproc-bigquery/dataproc/co_extraction",
        "table":"data_analytics_default.BQ_CO_SM_CHANNELS_01",
        "mode":"overwrite"
    },
    {
        "prefix":"ANALYTICS/ARCUS/Maestra_Promocion_Colombia*",
        "bucket":f"{ENV}-bucket-dataproc-bigquery/dataproc/co_extraction",
        "table":"data_analytics_default.BQ_CO_SM_PROMOTIONS_01",
        "mode":"overwrite"
    },
    {
        "prefix":"ANALYTICS/ARCUS/Maestra_Tienda_Colombia*",
        "bucket":f"{ENV}-bucket-dataproc-bigquery/dataproc/co_extraction",
        "table":"data_analytics_default.BQ_CO_SM_STORES_01",
        "mode":"overwrite"
    },
    {
        "prefix":"ANALYTICS/ARCUS/Productos_Arcus_Colombia*",
        "bucket":f"{ENV}-bucket-dataproc-bigquery/dataproc/co_extraction",
        "table":"data_analytics_default.BQ_CO_SM_PRODUCTS_01",
        "mode":"append"
    },
    {
        "prefix":"ANALYTICS/ARCUS/Ventas_Arcus_Colombia*",
        "bucket":f"{ENV}-bucket-dataproc-bigquery/dataproc/co_extraction",
        "table":"data_analytics_default.BQ_CO_SM_SALES_01",
        "mode":"append"
    },
    {
        "prefix":"ANALYTICS/ARCUS/Stock_Arcus_Colombia*",
        "bucket":f"{ENV}-bucket-dataproc-bigquery/dataproc/co_extraction",
        "table":"data_analytics_default.BQ_CO_SM_STOCK_01",
        "mode":"overwrite",
        "transformation":transformation_stock
    }
]
for source in sources:
    output_table_name = source["table"]
    prefix = source["prefix"]
    print(f"Initialization: {output_table_name}")
    len_new_data = 0
    counter = 17
    while (len_new_data==0 and counter>0):
        new_data = spark.read\
            .parquet(f"s3a://{bucket_name}/{prefix}")

        if("transformation" in source.keys()):
            new_data = source["transformation"](new_data)

        len_new_data = new_data.count()
        print("# new data: ", len_new_data)

        if(len_new_data==0):
            print("----> WARN: Not uploaded to bigquery because no data.")
            time.sleep(60*10)
            counter=counter-1

    if(source["mode"]=="overwrite"):
        new_data.write.format("bigquery") \
            .option("temporaryGcsBucket",source["bucket"]) \
            .option("table", output_table_name) \
            .mode("overwrite") \
            .save()
        print("Data overwritten!")
        continue
    # Read the historical data from the BigQuery table into a DataFrame
    historical_data = spark.read.format("bigquery") \
        .option("table", output_table_name) \
        .load()
    
    historical_data = historical_data.drop("_PARTITIONTIME", "_PARTITIONDATE")

    # Identify the new data that doesn't exist in the filtered historical data
    new_data_unique = new_data.exceptAll(historical_data)

    # Append the new unique data to the historical data in BigQuery
    # combined_data = filtered_historical_data.union(new_data_unique)
    n_new_data = new_data_unique.count()
    print("# historical data: ", historical_data.count())
    print("# new unique data: ", n_new_data)
    
    if(n_new_data==0):
        print("----> WARN: Not uploaded to bigquery because no unique data.")
        continue
    new_data_unique.write.format("bigquery") \
        .option("temporaryGcsBucket",source["bucket"]) \
        .option("table", output_table_name) \
        .mode("append") \
        .save()

# Stop the SparkSession
spark.stop()

Data overwritten!


In [6]:
print("done")

done
