# 1. Data Extraction
## Extracción de los datos que están en el contenedor del Storage Account de Microsoft Azure.

### Primero definimos las variables necesarias para ingresar al contenedor

In [None]:
storage_account_name = "Storage Account Name"
storage_account_access_key = "Account Access Key"
container = "datos"
spark.conf.set(f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net",storage_account_access_key)
# Listamos el path para cada archivo junto con su nombre y demás propiedades
files_raw = dbutils.fs.ls(f"wasbs://{container}@{storage_account_name}.blob.core.windows.net/")

In [None]:
# A partir de la lista de dbutils realizamos una nueva lista donde obtenemos las URLs para cada archivo
file_list = [str(i).split('\'')[1] for i in files_raw]
# Eliminamos los datos de metadata -> Generan problemas en la ejecución
metadata = file_list.pop()
# A partir de file_list también podemos realizar una nueva lista que nos permita obtener los nombres de los archivos
file_names = [i.split('/')[3] for i in file_list]

### Creamos una función que reciba el path del archivo y nos genere el dataframe

In [None]:
def get_df(file_location):
    file_type = "json"
    spark.conf.set("fs.azure.account.key." + storage_account_name + ".blob.core.windows.net", storage_account_access_key)
    df = spark.read.format(file_type).option("inferSchema", "true").load(file_location)
    return df

### Creamos los dataframes a partir  de la función previamente definida

In [None]:
# Creating dataframes
amazon_instant_video = get_df(file_list[0])
apps_for_android = get_df(file_list[1])
automotive = get_df(file_list[2])
baby = get_df(file_list[3])
beauty = get_df(file_list[4])
books = get_df(file_list[5])
cds_and_vinyl = get_df(file_list[6])
cell_phones_accessories = get_df(file_list[7])
clothing_shoes_jewelry = get_df(file_list[8])
digital_music = get_df(file_list[9])
electronics = get_df(file_list[10])
grocery_and_gourmet_food = get_df(file_list[11])
health_and_personal = get_df(file_list[12])
home_and_kitchen = get_df(file_list[13])
kindle_store = get_df(file_list[14])
movies_and_tv = get_df(file_list[15])
musical_instruments = get_df(file_list[16])
office_products = get_df(file_list[17])
patio_lawn_garden = get_df(file_list[18])
pet_supplies = get_df(file_list[19])
sports_and_outdoors = get_df(file_list[20])
toys_and_games = get_df(file_list[21])
videogames = get_df(file_list[22])

# 2. Transformación
## Continuamos con la transformación de nuestros dataframes. Afortunadamente, comparten las mismas columnas

In [None]:
# Uniendo los dataframes
from pyspark.sql import DataFrame
from functools import reduce
def unionAll(*dfs):
    df_union = reduce(DataFrame.unionAll, dfs)
    return df_union
df_complete = unionAll(amazon_instant_video, apps_for_android, automotive, baby, beauty, books, cds_and_vinyl, cell_phones_accessories, clothing_shoes_jewelry,
              digital_music, electronics, grocery_and_gourmet_food, health_and_personal, home_and_kitchen, kindle_store, movies_and_tv, musical_instruments, office_products,patio_lawn_garden,                         pet_supplies, sports_and_outdoors, toys_and_games, videogames)

In [None]:
# Shape
print(df_complete.count(), len(df_complete.columns))

In [None]:
# Cambiando nombre de columnas y dropeando la columna de reviewTime
df_complete = df_complete.withColumnRenamed("asin", "asinID")
df_complete = df_complete.drop('reviewTime')

In [None]:
# Cambiando la columna de Helpful -> Creando dos nuevas columnas que separen los datos
from pyspark.sql.functions import expr
df_complete = df_complete.select(["asinID", "reviewText", "reviewerID", "reviewerName", "summary", "unixReviewTime", "helpful"]+[expr("helpful[" + str(x)+ "]") for x in range(0, 2)]).drop('helpful')

In [None]:
# Cambiando UnixTime a datetime para mejorar el filtrado
from pyspark.sql.functions import from_unixtime
df_complete = df_complete.withColumn("datetime", from_unixtime("unixReviewTime")).drop("UnixReviewTime")

In [None]:
# Organizando 
df_complete = df_complete.orderBy("datetime", ascending = False)

In [None]:
df_complete.show()

# 3. Load: Creando la base de datos con nuestros datos transformados

In [None]:
# Agregando la tabla a la base de datos SQL de databricks
df_complete.write.format("parquet").saveAsTable("reviews")

In [None]:
def update_database(df):
    
    '''Esta función recibe un dataframe de spark y exporta los datos a la base de datos de Azure SQL'''
    
    jdbcHostname = "database_server_name.database.windows.net"
    jdbcPort = "1433"
    jdbcDatabase = "database_name"
    properties = {"user": "Database login Username", "password": "Password"}
    
    url = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname,jdbcPort,jdbcDatabase)
    
    df = DataFrameWriter(df)
    return df.jdbc(url = url, table = "reviews", mode = "overwrite", properties = properties)

In [None]:
# Generando querys para hacer la ingesta delta
# YEARS: 2010 - 2014
df_2010_2014 = spark.sql("SELECT * FROM reviews WHERE YEAR(datetime) BETWEEN '2010' AND '2014' ")

In [None]:
update_database(df_2010_2014)

In [None]:
df_2007_2009 = spark.sql("SELECT * FROM reviews WHERE YEAR(datetime) BETWEEN '2010' AND '2014' ")

In [None]:
df_2004_2006 = spark.sql("SELECT * FROM reviews WHERE YEAR(datetime) BETWEEN '2004' AND '2006' ")

In [None]:
df_2000_2003 = spark.sql("SELECT * FROM reviews WHERE YEAR(datetime) BETWEEN '2010' AND '2014' ")