# Lab 10 Asignación 2
Al fnal de año hay muchos departamentos de la compañía que necesitan diferentes datos para presentar sus informes externos o para analizar los resultados.

El director de Data y Business Intelligence se ha reunido con su equipo de Data Engineering y les ha comunicado los datasets que tienen que generar para responder a a las diferentes peticiones y a las preguntas de negocio.

Para llevar la tarea a cabo se han ingestado diferentes ficheros de texto procedentes de las diferentes bases de datos de los sistemas de gestión de la compañía.

1. "transactions-asignment.csv". Es el fichero con todas las transaccioned del año 2018 con columnas: TransactionID, CustomerID, Date, Product, Group, Amount, PaymentType, Country.
2. "customer-data-asignment.csv". Es el fichero con los datos de los clientes y con columnas CustomerID, CardNumber. Contiene la relación entre cada cliente y su número de tarjeta de crédito.
3. "country-vat.csv". Es el fichero con las columnas: VAT, Country. Contiene la relación de cada país con el IVA soportado para los productos.
4. Los productos se han nominado con etiquetas: A to J.
5. Los grupos a los que pertenecen los productos son: 'Food', 'Leisure', 'Restaurant', 'Gym', 'Gambling', 'Travel', 'Learning'.
6. El tipo de pago contiene: 'Visa', 'Cash', 'American', 'PayPal', 'Mastercard', 'Check'.
7. Los países desde donde se han producido las transacciones son: 'Italy', 'France', 'Germany', 'USA', 'Brasil', 'UK', 'Switzerland', 'Sweden', 'Denmark', 'Canada', 'Mexico', 'Russia'.

Los informes que se piden son los siguientes:
1. El departamento de Marketing de productos quiere analizar las transacciones mensuales por país respecto al cada grupo y a cada producto dentro del grupo. Por esta razón necesita dos Dataframes: uno agrupado por cada grupo y otro agrupado por cada producto dentro de cada grupo. Las columnas para el primer DataFrame deberían ser: *Country, Month, Group, TotalMonth, AvgMonth, Number of Transactions*. Y para el segundo deberían ser: *Country, Month, Group, Product, TotalMonth, AvgMonth, Number of Transactions*. Cuántos registros tiene cada DataFrame?
2. Podríamos decir al departamento de Marketing *cuál es el país y el mes con el mayor importe de las transacciones para el grupo Restaurant*? Cuántas transacciones se han realizado para ese país y ese mes?
3. Podríamos decir al departamento de Marketing *cuál es el producto que más importe mensual ha tenido para el grupo de Gambling y cuál es el país y el mes*?
4. El departamento financiero necesita liquidar el IVA anual (el total para cada transacción es bruto) para cada país con las autoridades del Tesoro. Podríamos decir al departameto financiero *cuál es el país con el que debemos liquidar la mayor cantidad de IVA y cuál es esa cantidad*? Y *cuál es el país con el que deberemos liquidar el menor importe de IVA*? 
5. También necesita saber el total anual por tipo de pago (excepto los pagos en cash) para liquidar con los bancos. Podríamos decir al departamento financiero *cuál es la cantidad total por tipo de pago*?
6. Quién es nuestro mejor cliente y cuál es su número de tarjeta de crédito?
7. Calcular el total por tipo de pago procedente de USA y de fuera de USA.
8. Cuál es la proporción entre lo que viene de USA y lo que viene de fuera de USA por tipo de pago?

Hint para 4: Para calcular la liquidación por IVA, deberemos hacer un `join` del Dataframe de transacciones con el Dataframe de países, reando una nueva columna con el IVA correspondiente a cada transacción y luego agrupar por país y agregar la suma de la columna creada.

Hint para 5: Deberemos filtrar el Dataframe para suprimir los pagos en cash, agrupar por tipo de pago y agregar la suma total.

In [1]:
import os
import sys

os.environ['SPARK_HOME'] = "C:\\spark-3.0.3-bin-hadoop2.7\\"
SPARK_HOME = os.environ['SPARK_HOME']
# PYTHONPATH=$SPARK_HOME/python:$SPARK_HOME/python/lib/py4j-0.10.9-src.zip:$PYTHONPATH
# Añadimos los correspondientes paths de las librerias de python
sys.path.insert(0, os.path.join(SPARK_HOME, "python"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "lib"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "lib","pyspark.zip"))
sys.path.insert(0, os.path.join(SPARK_HOME, "python", "lib", "py4j-0.10.9-src.zip"))

# Importamos la funcion
from pyspark.sql import SparkSession

# Creamos la sesion
spark = SparkSession \
            .builder \
            .master("local[*]") \
            .appName("Lab8") \
            .config("spark.executor.memory", "6g") \
            .config("spark.cores.max", "4") \
            .getOrCreate()


# Creamos el sparkContext de la sesion    
sc = spark.sparkContext

In [2]:
sc

In [3]:
sc.getConf().getAll()

[('spark.executor.id', 'driver'),
 ('spark.app.id', 'local-1647602475351'),
 ('spark.cores.max', '4'),
 ('spark.app.startTime', '1647602474522'),
 ('spark.executor.memory', '6g'),
 ('spark.app.name', 'Lab8'),
 ('spark.driver.host', 'Arrakis'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.port', '52537'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

### 1. Creating a DataFrame

In [4]:
transaction_asg = spark.read \
             .format('csv') \
             .option("inferSchema", "true") \
             .option("delimiter", ",") \
             .option('header','true') \
             .load('../data/transactions-asignment.csv')

transaction_asg.show()

+--------------------+----------+-------------------+-------+----------+------+-----------+-----------+
|       TransactionID|CustomerID|               Date|Product|     Group|Amount|PaymentType|    Country|
+--------------------+----------+-------------------+-------+----------+------+-----------+-----------+
|7564e71a-3050-11e...|      4255|2018-07-18 02:36:00|      I|       Gym| 231.0|   American|     Sweden|
|7565ad98-3050-11e...|     30514|2018-03-08 07:37:00|      D|       Gym|  59.7|      Check|     Mexico|
|7565ad99-3050-11e...|      3853|2018-12-04 10:25:00|      B|    Travel|3782.0|       Cash|     Mexico|
|7565ad9a-3050-11e...|     49729|2018-04-12 19:23:00|      I|  Gambling| 231.0|      Check|      Italy|
|7565ad9b-3050-11e...|     33467|2018-02-01 23:17:00|      G|      Food|289.26| Mastercard|     Sweden|
|7565ad9c-3050-11e...|     45120|2018-09-08 11:21:00|      I|Restaurant|  23.1|      Check|         UK|
|7565ad9d-3050-11e...|      9965|2018-11-06 08:27:00|      C|   

Primero, construimos las dos nuevas columnas para el mes y el año en el Dataframe `transactionDF` que nos harán falta para contestar a las preguntas de negocio

In [5]:
from pyspark.sql import functions as f

transactionDF = transaction_asg.withColumn('Year',f.split(transaction_asg.Date, '-')[0]) \
             .withColumn('Month',f.split(transaction_asg.Date, '-')[1]) \
             .drop('Date')
transactionDF.show()


+--------------------+----------+-------+----------+------+-----------+-----------+----+-----+
|       TransactionID|CustomerID|Product|     Group|Amount|PaymentType|    Country|Year|Month|
+--------------------+----------+-------+----------+------+-----------+-----------+----+-----+
|7564e71a-3050-11e...|      4255|      I|       Gym| 231.0|   American|     Sweden|2018|   07|
|7565ad98-3050-11e...|     30514|      D|       Gym|  59.7|      Check|     Mexico|2018|   03|
|7565ad99-3050-11e...|      3853|      B|    Travel|3782.0|       Cash|     Mexico|2018|   12|
|7565ad9a-3050-11e...|     49729|      I|  Gambling| 231.0|      Check|      Italy|2018|   04|
|7565ad9b-3050-11e...|     33467|      G|      Food|289.26| Mastercard|     Sweden|2018|   02|
|7565ad9c-3050-11e...|     45120|      I|Restaurant|  23.1|      Check|         UK|2018|   09|
|7565ad9d-3050-11e...|      9965|      C|    Travel|2590.0|      Check|Switzerland|2018|   11|
|7565ad9e-3050-11e...|     20404|      J|   Leisur

### 1. Datasets para el departamento de marketing de producto

#### 1.a Contruimos los DataFrames

In [6]:
transaction_group = transactionDF.groupBy('Country','Group','Month') \
             .agg(f.sum('Amount').cast("decimal(12,2)").alias('TotalMonth'),f.round(f.mean('Amount'),2).alias('AvgMonth'),f.count('TransactionID').alias('NumberTransactions'))
transaction_group.show()


+-----------+----------+-----+----------+--------+------------------+
|    Country|     Group|Month|TotalMonth|AvgMonth|NumberTransactions|
+-----------+----------+-----+----------+--------+------------------+
|    Germany|  Gambling|   10| 346577.10|  341.46|              1015|
|     Sweden|      Food|   10| 248074.74|  265.32|               935|
|    Denmark|Restaurant|   10| 105410.08|  106.69|               988|
|     Brasil|       Gym|   06| 179087.93|  180.35|               993|
|      Italy|  Learning|   09| 195458.00|  204.45|               956|
|        USA|       Gym|   05| 181105.56|  184.99|               979|
|     Russia|      Food|   08| 257216.29|  265.72|               968|
|     France|   Leisure|   10|  40978.78|   40.49|              1012|
|Switzerland|   Leisure|   06|  43255.43|   41.95|              1031|
|      Italy|  Gambling|   01| 332870.00|  335.89|               991|
|     Mexico|    Travel|   09|2738803.00| 2703.66|              1013|
|        USA|   Leis

In [7]:
transaction_product = transactionDF.groupBy('Country','Group','Month','Product') \
             .agg(f.sum('Amount').cast("decimal(12,2)").alias('TotalMonth'),f.round(f.mean('Amount'),2).alias('AvgMonth'),f.count('TransactionID').alias('Number of Transactions'))
transaction_product.show()


+-----------+----------+-----+-------+----------+--------+----------------------+
|    Country|     Group|Month|Product|TotalMonth|AvgMonth|Number of Transactions|
+-----------+----------+-----+-------+----------+--------+----------------------+
|    Germany|   Leisure|   02|      F|   4578.33|   45.33|                   101|
|     Russia|      Food|   12|      B|  48298.20|   478.2|                   101|
|      Italy|Restaurant|   03|      C|  27195.00|   259.0|                   105|
|     Canada|       Gym|   05|      G|  31501.00|   289.0|                   109|
|     Mexico|  Gambling|   01|      G|  31212.00|   289.0|                   108|
|Switzerland|  Learning|   11|      B|  30189.00|   347.0|                    87|
|      Italy|    Travel|   06|      I| 291060.00|  2310.0|                   126|
|     France|    Travel|   12|      D|  60894.00|   597.0|                   102|
|        USA|   Leisure|   09|      G|   3121.20|    28.9|                   108|
|     Brasil|   

#### 1.b Número de registros

In [8]:
print('Numero de registros en el primer DataFrame: ', transaction_group.count())
print('Numero de registros en el segundo DataFrame: ', transaction_product.count())

Numero de registros en el primer DataFrame:  1008
Numero de registros en el segundo DataFrame:  10080


### 2. Respuesta a cuál es el país y el mes con mayor cantidad de venta para el grupo Restaurante

In [9]:
transaction_group.createOrReplaceTempView("trans_group_t")

In [10]:
spark.sql("SELECT country,month,group,TotalMonth,NumberTransactions FROM trans_group_t WHERE group= 'Restaurant' ORDER BY TotalMonth DESC LIMIT 1").show()

+-------+-----+----------+----------+------------------+
|country|month|     group|TotalMonth|NumberTransactions|
+-------+-----+----------+----------+------------------+
|  Italy|   04|Restaurant| 118583.97|              1060|
+-------+-----+----------+----------+------------------+



### 3. Respuesta a cuál es el producto con la mayor venta en Gambling y cuál es el país y el mes en los que se ha producido esa venta

In [11]:
transaction_product.createOrReplaceTempView("trans_product_t")

In [12]:
spark.sql("SELECT country,month,Group,Product,TotalMonth FROM trans_product_t WHERE Group='Gambling' ORDER BY TotalMonth DESC LIMIT 1").show()

+-------+-----+--------+-------+----------+
|country|month|   Group|Product|TotalMonth|
+-------+-----+--------+-------+----------+
| Russia|   10|Gambling|      D|  71640.00|
+-------+-----+--------+-------+----------+



### 4. Respuesta a la cantidad de IVA a liquidar con cada país

In [13]:
country_vat = spark.read \
             .format('csv') \
             .option("inferSchema", "true") \
             .option("delimiter", ",") \
             .option('header','true') \
             .load('../data/country-vat.csv')

country_vat.show()

+---+-----------+
|VAT|    Country|
+---+-----------+
| 21|      Italy|
| 19|     France|
| 18|    Germany|
| 20|        USA|
| 18|     Brasil|
| 18|         UK|
| 18|Switzerland|
| 18|     Sweden|
| 19|    Denmark|
| 21|     Canada|
| 18|     Mexico|
| 21|     Russia|
+---+-----------+



In [14]:
transaction_country = transactionDF.groupBy('Country') \
             .agg(f.sum('Amount').alias('TotalMonth'))
transaction_country.show()


+-----------+--------------------+
|    Country|          TotalMonth|
+-----------+--------------------+
|     Russia|4.5913706530000106E7|
|     Sweden| 4.583002558000009E7|
|    Germany| 4.565745817000011E7|
|     France| 4.547257416000011E7|
|      Italy| 4.567639189000008E7|
|    Denmark| 4.535337095000008E7|
|        USA|4.5964789340000145E7|
|     Mexico| 4.588160267000013E7|
|         UK| 4.554704884000009E7|
|Switzerland| 4.589588048000009E7|
|     Canada|4.5358978390000135E7|
|     Brasil| 4.558835685000007E7|
+-----------+--------------------+



In [15]:
transaction_country1 = transaction_country.withColumnRenamed('Country','CountryName')
transaction_country_vat = transaction_country1.join(country_vat, transaction_country1.CountryName == country_vat.Country,'left').drop('CountryName')
                                                                  
transaction_country_vat.show()

+--------------------+---+-----------+
|          TotalMonth|VAT|    Country|
+--------------------+---+-----------+
|4.5913706530000106E7| 21|     Russia|
| 4.583002558000009E7| 18|     Sweden|
| 4.565745817000011E7| 18|    Germany|
| 4.547257416000011E7| 19|     France|
| 4.567639189000008E7| 21|      Italy|
| 4.535337095000008E7| 19|    Denmark|
|4.5964789340000145E7| 20|        USA|
| 4.588160267000013E7| 18|     Mexico|
| 4.554704884000009E7| 18|         UK|
| 4.589588048000009E7| 18|Switzerland|
|4.5358978390000135E7| 21|     Canada|
| 4.558835685000007E7| 18|     Brasil|
+--------------------+---+-----------+



In [16]:

transaction_country1 = transaction_country.withColumnRenamed('Country','CountryName')
transaction_country_vat = transaction_country1.join(country_vat, transaction_country1.CountryName == country_vat.Country,'left').drop('CountryName')\
                            .withColumn("Tax", (f.round(f.col('TotalMonth')*(f.col('VAT'))/(f.col('VAT')+100))))
                                                                  
transaction_country_vat.show()

+--------------------+---+-----------+---------+
|          TotalMonth|VAT|    Country|      Tax|
+--------------------+---+-----------+---------+
|4.5913706530000106E7| 21|     Russia|7968495.0|
| 4.583002558000009E7| 18|     Sweden|6991021.0|
| 4.565745817000011E7| 18|    Germany|6964697.0|
| 4.547257416000011E7| 19|     France|7260327.0|
| 4.567639189000008E7| 21|      Italy|7927308.0|
| 4.535337095000008E7| 19|    Denmark|7241295.0|
|4.5964789340000145E7| 20|        USA|7660798.0|
| 4.588160267000013E7| 18|     Mexico|6998889.0|
| 4.554704884000009E7| 18|         UK|6947855.0|
| 4.589588048000009E7| 18|Switzerland|7001067.0|
|4.5358978390000135E7| 21|     Canada|7872219.0|
| 4.558835685000007E7| 18|     Brasil|6954156.0|
+--------------------+---+-----------+---------+



El país con mayor IVA a liquidar es: (Russia|8,003,340.04) y el menor (UK|6,947,854.91)

### 5. Respuesta a la cantidad total por tipo de pago (excepto pagos en cash) a liquidar con nuestro banco

In [17]:
total_type_payment = transaction_asg.filter(transaction_asg.PaymentType != "Cash")\
                            .groupBy("PaymentType")\
                            .agg(f.sum("Amount").cast("decimal(12,2)").alias("Total"))\
                            .sort("Total", ascending=False)

total_type_payment.show()

+-----------+-----------+
|PaymentType|      Total|
+-----------+-----------+
|      Check|91842289.77|
|       Visa|91727292.28|
| Mastercard|91343170.82|
|   American|90893043.24|
|     PayPal|90871507.94|
+-----------+-----------+



### 6. Respuesta a quién es nuestro mejor cliente y cual es su número de tarjeta de crédito

In [41]:
customer_DF = spark.read\
            .format("csv")\
            .option("inferSchema", "true")\
            .option("header", "true")\
            .load("../data/customer-data-asignment.csv")

customer_DF.show()

+----------+--------------+
|CustomerID|    CardNumber|
+----------+--------------+
|         0|15056453071315|
|         1|15056453588315|
|         2|15056453027428|
|         3|15056453920562|
|         4|15056453139712|
|         5|15056453728136|
|         6|15056453811382|
|         7|15056453319623|
|         8|15056453773603|
|         9|15056453113544|
|        10|15056453466129|
|        11|15056453388773|
|        12|15056453171508|
|        13|15056453557940|
|        14|15056453946043|
|        15|15056453937730|
|        16|15056453042890|
|        17|15056453318771|
|        18|15056453544763|
|        19|15056453173430|
+----------+--------------+
only showing top 20 rows



In [42]:
Trans3 = transactionDF.withColumnRenamed("CustomerID", "Customer_ID3")
#Trans3.show()
BestCustomer = Trans3.groupBy("Customer_ID3")\
                .agg(f.round(f.sum("Amount"),3).alias("TotalCustomer"))\
                .sort("TotalCustomer", ascending=False)

#BestCustomer.show()
BestCustomerData =   BestCustomer.join(customer_DF, BestCustomer["Customer_ID3"] == customer_DF["CustomerID"], "left")\
                    .drop("CustomerID")
                    

BestCustomerData.show(1)


+------------+-------------+--------------+
|Customer_ID3|TotalCustomer|    CardNumber|
+------------+-------------+--------------+
|       23271|     15969.39|15056453065678|
+------------+-------------+--------------+
only showing top 1 row



### 7. Cálculo del total procedente de USA por tipo de pago

In [46]:
USA_total = transactionDF.filter(f.col("Country")=="USA")\
                        .groupBy(f.col("PaymentType").alias("PaymentType_USA"))\
                        .agg(f.round(f.sum("Amount"),2).alias("Total_USA"))
USA_total.show()

+---------------+----------+
|PaymentType_USA| Total_USA|
+---------------+----------+
|           Visa|7837635.69|
|          Check|7827298.36|
|     Mastercard|7750294.77|
|         PayPal|7415358.01|
|           Cash|7636967.96|
|       American|7497234.55|
+---------------+----------+



### 8. Cuál es la relación entre el total procedente de USA y el procedente de fuera de USA por tipo de pago

In [47]:
NO_USA = transactionDF.filter(f.col("Country")!="USA")\
                    .groupBy(f.col("PaymentType").alias("PaymentType_NO_USA"))\
                    .agg(f.sum("Amount").cast("decimal(12,2)").alias("Total_NO_USA"))
NO_USA.show()

+------------------+------------+
|PaymentType_NO_USA|Total_NO_USA|
+------------------+------------+
|              Visa| 83889656.59|
|             Check| 84014991.41|
|        Mastercard| 83592876.05|
|            PayPal| 83456149.93|
|              Cash| 83825911.84|
|          American| 83395808.69|
+------------------+------------+



In [60]:
Total =    transactionDF.groupBy("PaymentType")\
                .agg(f.sum("Amount").cast("decimal(12,2)").alias("Total_World"))\

#Total.show()
USA_vs_NO_USA1 =  NO_USA.join(USA_total, NO_USA["PaymentType_NO_USA"] == USA_total["PaymentType_USA"], "left")\
                    .drop("PaymentType_USA")
#USA_vs_NO_USA.show()

USA_vs_NO_USA2 = USA_vs_NO_USA1.join(Total, USA_vs_NO_USA1["PaymentType_NO_USA"] == Total["PaymentType"], "left")\
                    .drop("PaymentType").withColumnRenamed("PaymentType_NO_USA", "PaymentType")\
                    .withColumn("USA_Perc",f.round(f.col("Total_USA")/f.col("Total_World")*100,3))\
                    .withColumn("NO_USA_Perc",f.round(f.col("Total_NO_USA")/f.col("Total_World")*100,3))\


    
USA_vs_NO_USA2.show()

+-----------+------------+----------+-----------+--------+-----------+
|PaymentType|Total_NO_USA| Total_USA|Total_World|USA_Perc|NO_USA_Perc|
+-----------+------------+----------+-----------+--------+-----------+
|       Visa| 83889656.59|7837635.69|91727292.28|   8.544|     91.456|
|      Check| 84014991.41|7827298.36|91842289.77|   8.523|     91.477|
| Mastercard| 83592876.05|7750294.77|91343170.82|   8.485|     91.515|
|     PayPal| 83456149.93|7415358.01|90871507.94|    8.16|     91.840|
|       Cash| 83825911.84|7636967.96|91462879.80|    8.35|     91.650|
|   American| 83395808.69|7497234.55|90893043.24|   8.248|     91.752|
+-----------+------------+----------+-----------+--------+-----------+



Las columnas "USA_Perc" y "NO_USA_Perc" muestran los porcetajes respecto al total mundial por medio de pago 


In [None]:
sc.stop()