# Estudiante: Jason Solano

# Prueba para probar la conexión descripta por el profesor

### Probamos la configuración inical para poder realizar la conexión con postgresql, en una base de datos local

In [38]:
import findspark
findspark.init('/opt/spark')

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, udf 
from pyspark.sql.types import DateType

dir = "/Users/jasonsolano/Documents/BigData/BigDataTEC/Clase1/DB/"
spark = SparkSession \
    .builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.driver.extraClassPath", dir+"postgresql-42.1.4.jar") \
    .config("spark.executor.extraClassPath", dir+"postgresql-42.1.4.jar") \
    .getOrCreate()

# Reading single DataFrame in Spark by retrieving all rows from a DB table.
df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost/ShoppingDB") \
    .option("user", "postgres") \
    .option("password", "102800") \
    .option("dbtable", "transactions") \
    .load()

df.show()

+---+-----------+------+-------------------+
| id|customer_id|amount|       purchased_at|
+---+-----------+------+-------------------+
|  1|          1|    55|2017-03-01 09:00:00|
|  2|          1|   125|2017-03-01 10:00:00|
|  3|          1|    32|2017-03-02 13:00:00|
|  4|          1|    64|2017-03-02 15:00:00|
|  5|          1|   128|2017-03-03 10:00:00|
|  6|          2|   333|2017-03-01 09:00:00|
|  7|          2|   334|2017-03-01 09:01:00|
|  8|          2|   333|2017-03-01 09:02:00|
|  9|          2|    11|2017-03-03 20:00:00|
| 10|          2|    44|2017-03-03 20:15:00|
+---+-----------+------+-------------------+



## Realizamos varios ejercicios descritos en el jupyter notebook suministrado por el profesor

In [2]:
formatted_df = df.withColumn("date_string", date_format(col("purchased_at"), 'MM/dd/yyyy'))
formatted_df.show()

+---+-----------+------+-------------------+-----------+
| id|customer_id|amount|       purchased_at|date_string|
+---+-----------+------+-------------------+-----------+
|  1|          1|    55|2017-03-01 09:00:00| 03/01/2017|
|  2|          1|   125|2017-03-01 10:00:00| 03/01/2017|
|  3|          1|    32|2017-03-02 13:00:00| 03/02/2017|
|  4|          1|    64|2017-03-02 15:00:00| 03/02/2017|
|  5|          1|   128|2017-03-03 10:00:00| 03/03/2017|
|  6|          2|   333|2017-03-01 09:00:00| 03/01/2017|
|  7|          2|   334|2017-03-01 09:01:00| 03/01/2017|
|  8|          2|   333|2017-03-01 09:02:00| 03/01/2017|
|  9|          2|    11|2017-03-03 20:00:00| 03/03/2017|
| 10|          2|    44|2017-03-03 20:15:00| 03/03/2017|
+---+-----------+------+-------------------+-----------+



In [3]:
string_to_date = \
    udf(lambda text_date: datetime.strptime(text_date, '%m/%d/%Y'),
        DateType())

typed_df = formatted_df.withColumn("date", string_to_date(formatted_df.date_string))
typed_df.show()
typed_df.printSchema()

+---+-----------+------+-------------------+-----------+----------+
| id|customer_id|amount|       purchased_at|date_string|      date|
+---+-----------+------+-------------------+-----------+----------+
|  1|          1|    55|2017-03-01 09:00:00| 03/01/2017|2017-03-01|
|  2|          1|   125|2017-03-01 10:00:00| 03/01/2017|2017-03-01|
|  3|          1|    32|2017-03-02 13:00:00| 03/02/2017|2017-03-02|
|  4|          1|    64|2017-03-02 15:00:00| 03/02/2017|2017-03-02|
|  5|          1|   128|2017-03-03 10:00:00| 03/03/2017|2017-03-03|
|  6|          2|   333|2017-03-01 09:00:00| 03/01/2017|2017-03-01|
|  7|          2|   334|2017-03-01 09:01:00| 03/01/2017|2017-03-01|
|  8|          2|   333|2017-03-01 09:02:00| 03/01/2017|2017-03-01|
|  9|          2|    11|2017-03-03 20:00:00| 03/03/2017|2017-03-03|
| 10|          2|    44|2017-03-03 20:15:00| 03/03/2017|2017-03-03|
+---+-----------+------+-------------------+-----------+----------+

root
 |-- id: integer (nullable = true)
 |-- cu

In [4]:
sum_df = typed_df.groupBy("customer_id", "date").sum()
sum_df.show()

+-----------+----------+-------+----------------+-----------+
|customer_id|      date|sum(id)|sum(customer_id)|sum(amount)|
+-----------+----------+-------+----------------+-----------+
|          2|2017-03-01|     21|               6|       1000|
|          1|2017-03-02|      7|               2|         96|
|          1|2017-03-03|      5|               1|        128|
|          1|2017-03-01|      3|               2|        180|
|          2|2017-03-03|     19|               4|         55|
+-----------+----------+-------+----------------+-----------+



In [5]:
stats_df = \
    sum_df.select(
        col('customer_id'),
        col('date'),
        col('sum(amount)').alias('amount'))

stats_df.printSchema()
stats_df.show()

root
 |-- customer_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: long (nullable = true)

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|          2|2017-03-01|  1000|
|          1|2017-03-02|    96|
|          1|2017-03-03|   128|
|          1|2017-03-01|   180|
|          2|2017-03-03|    55|
+-----------+----------+------+



In [6]:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

names_df = spark \
    .read \
    .format("csv") \
    .option("path", "names.csv") \
    .option("header", True) \
    .schema(StructType([
                StructField("id", IntegerType()),
                StructField("name", StringType()),
                StructField("currency", StringType())])) \
    .load()

names_df.printSchema()
names_df.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- currency: string (nullable = true)

+---+----+--------+
| id|name|currency|
+---+----+--------+
|  1|John|     CRC|
|  2|Jane|     EUR|
+---+----+--------+



In [7]:
joint_df = stats_df.join(names_df, stats_df.customer_id == names_df.id)
joint_df.printSchema()
joint_df.show()

root
 |-- customer_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: long (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- currency: string (nullable = true)

+-----------+----------+------+---+----+--------+
|customer_id|      date|amount| id|name|currency|
+-----------+----------+------+---+----+--------+
|          2|2017-03-01|  1000|  2|Jane|     EUR|
|          1|2017-03-02|    96|  1|John|     CRC|
|          1|2017-03-03|   128|  1|John|     CRC|
|          1|2017-03-01|   180|  1|John|     CRC|
|          2|2017-03-03|    55|  2|Jane|     EUR|
+-----------+----------+------+---+----+--------+



# Ejercicio
- Cree un pequeño generador de transacciones. Para ello, puede utilizar funciones de numpy o pandas, para crear las transacciones nuevas en memoria y, posteriormente, puede cargarlas a un *Spark Dataframe* que después debe insertar en la base de datos.
- Con los datos nuevos, ejecute el código de éste notebook.

### Realizamos una función para obtener dias random con el inicio de una determinada fecha

In [12]:
import numpy as np
def random_date_generator(start_date, range_in_days):
    days_to_add = np.arange(1, range_in_days)
    random_date = np.datetime64(start_date) + np.random.choice(days_to_add)
    return random_date
print(random_date_generator('2017-01-01 01:00:00',360))

2017-01-01T01:01:11


### se realiza una lista que guarda registros con los atributos requeridos para la tabla transactions

In [96]:
limite = 10
contador = 0
listData = list()
while(True):
    contador+=1
    if(contador == limite):
        break
    else:
        randomId = np.random.choice(2)
        randomAmount = np.random.choice(1000)
        randomDate = random_date_generator('2017-01-01',360)
        listData.append((randomId,randomAmount,randomDate))
print(listData)
        

[(0, 788, numpy.datetime64('2017-10-24')), (1, 857, numpy.datetime64('2017-02-02')), (0, 490, numpy.datetime64('2017-11-21')), (1, 39, numpy.datetime64('2017-11-27')), (0, 907, numpy.datetime64('2017-02-28')), (0, 351, numpy.datetime64('2017-06-16')), (1, 158, numpy.datetime64('2017-03-24')), (1, 373, numpy.datetime64('2017-03-04')), (1, 227, numpy.datetime64('2017-06-26'))]


### Se crea un dataframe con los registros con valores random, basados en la tabla transactions. 
### Para este caso es necesario crear una columna "date" con datos en string y una columna "purchased_at" con formato timestamp.

### Luego los datos de la columna "date" son tranformados al formato timestamp en la columna "purchased_at", y posterior esta columna es borrada

In [114]:
session=SparkSession.builder.appName('data_processing').getOrCreate()

In [127]:
from pyspark.sql.functions import to_timestamp
schema = StructType().add("customer_id","integer").add("amount","integer").add("date",'string')
dataframeSpark = session.createDataFrame(data,schema=schema)
dataframeSpark = dataframeSpark.withColumn('purchased_at', col('date').cast('timestamp'))
dataframeSpark = dataframeSpark.drop('date')
dataframeSpark.show()
    

+-----------+------+-------------------+
|customer_id|amount|       purchased_at|
+-----------+------+-------------------+
|          0|   788|2017-10-24 00:00:00|
|          1|   857|2017-02-02 00:00:00|
|          0|   490|2017-11-21 00:00:00|
|          1|    39|2017-11-27 00:00:00|
|          0|   907|2017-02-28 00:00:00|
|          0|   351|2017-06-16 00:00:00|
|          1|   158|2017-03-24 00:00:00|
|          1|   373|2017-03-04 00:00:00|
|          1|   227|2017-06-26 00:00:00|
+-----------+------+-------------------+



### Se guardan los registros anteriormente mencionados en la tabla transactions en el motor prostgresql 

In [128]:
dataframeSpark.write.format('jdbc').options(
      url='jdbc:postgresql://localhost/ShoppingDB',
      dbtable='transactions',
      user='postgres',
      password='102800').mode('append').save()

### Volvemos a leer los datos desde el motor de la base de datos en postgresql,  en el cual vemos los datos insertados 

In [129]:
df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost/ShoppingDB") \
    .option("user", "postgres") \
    .option("password", "102800") \
    .option("dbtable", "transactions") \
    .load()

df.show()

+---+-----------+------+-------------------+
| id|customer_id|amount|       purchased_at|
+---+-----------+------+-------------------+
|  1|          1|    55|2017-03-01 09:00:00|
|  2|          1|   125|2017-03-01 10:00:00|
|  3|          1|    32|2017-03-02 13:00:00|
|  4|          1|    64|2017-03-02 15:00:00|
|  5|          1|   128|2017-03-03 10:00:00|
|  6|          2|   333|2017-03-01 09:00:00|
|  7|          2|   334|2017-03-01 09:01:00|
|  8|          2|   333|2017-03-01 09:02:00|
|  9|          2|    11|2017-03-03 20:00:00|
| 10|          2|    44|2017-03-03 20:15:00|
| 12|          0|   788|2017-10-24 00:00:00|
| 13|          0|   490|2017-11-21 00:00:00|
| 11|          1|   158|2017-03-24 00:00:00|
| 14|          0|   907|2017-02-28 00:00:00|
| 15|          1|   857|2017-02-02 00:00:00|
| 16|          0|   351|2017-06-16 00:00:00|
| 17|          1|    39|2017-11-27 00:00:00|
| 18|          1|   373|2017-03-04 00:00:00|
| 19|          1|   227|2017-06-26 00:00:00|
+---+-----