La Tarea #3 consiste en instalar PostgreSQL, para luego ejecutar el código en la notebook ubicada en https://github.com/fmezacr/BigDataTEC/tree/master/Clase1/DB  y hacer el ejercicio indicado.

Para la tarea asumiremos que una tienda en línea almacena las compras de sus clientes en una base de datos PostgreSQL. Aplicaremos ciertas operaciones básicas sobre los datos para mostrar lo que este tipo de plataformas ofrecen. A continuación, un ejemplo básico de modelo de datos, que se usará posteriormente.

El siguiente ejemplo de código utiliza la base de datos descrita anteriormente y muestra el contenido almacenado en ella.

In [1]:
import findspark
findspark.init('/opt/spark/spark')

from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, udf 
from pyspark.sql.types import DateType


In [2]:
spark = SparkSession \
    .builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.driver.extraClassPath", "/opt/postgresql-42.2.9.jar") \
    .config("spark.executor.extraClassPath", "/opt/postgresql-42.2.9.jar") \
    .getOrCreate()

In [3]:
# Reading single DataFrame in Spark by retrieving all rows from a DB table.
df = spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost/postgres") \
    .option("user", "caro") \
    .option("password", "Caro2017") \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", "transactions") \
    .load()

df.show()

+---+-----------+------+-------------------+
| id|customer_id|amount|       purchased_at|
+---+-----------+------+-------------------+
|  1|          1|    55|2017-03-01 09:00:00|
|  2|          1|   125|2017-03-01 10:00:00|
|  3|          1|    32|2017-03-02 13:00:00|
|  4|          1|    64|2017-03-02 15:00:00|
|  5|          1|   128|2017-03-03 10:00:00|
|  6|          2|   333|2017-03-01 09:00:00|
|  7|          2|   334|2017-03-01 09:01:00|
|  8|          2|   333|2017-03-01 09:02:00|
|  9|          2|    11|2017-03-03 20:00:00|
| 10|          2|    44|2017-03-03 20:15:00|
+---+-----------+------+-------------------+



Se utiliza date_format en el módulo pyspark.sql.functions. El siguiente código crea una columna nueva después de aplicar la transformación.

In [4]:

formatted_df = df.withColumn("date_string", date_format(col("purchased_at"), 'MM/dd/yyyy'))
formatted_df.show()

+---+-----------+------+-------------------+-----------+
| id|customer_id|amount|       purchased_at|date_string|
+---+-----------+------+-------------------+-----------+
|  1|          1|    55|2017-03-01 09:00:00| 03/01/2017|
|  2|          1|   125|2017-03-01 10:00:00| 03/01/2017|
|  3|          1|    32|2017-03-02 13:00:00| 03/02/2017|
|  4|          1|    64|2017-03-02 15:00:00| 03/02/2017|
|  5|          1|   128|2017-03-03 10:00:00| 03/03/2017|
|  6|          2|   333|2017-03-01 09:00:00| 03/01/2017|
|  7|          2|   334|2017-03-01 09:01:00| 03/01/2017|
|  8|          2|   333|2017-03-01 09:02:00| 03/01/2017|
|  9|          2|    11|2017-03-03 20:00:00| 03/03/2017|
| 10|          2|    44|2017-03-03 20:15:00| 03/03/2017|
+---+-----------+------+-------------------+-----------+



Una manera cómoda de encapsular funciones de Python que deben ser aplicadas a celdas del Dataframe, pero aún no tienen su implementación en las bibliotecas.

El siguiente ejemplo muestra la columna creada previamente (tipo string) y la transforma en un tipo DateType propio de Spark.

In [5]:
string_to_date = \
    udf(lambda text_date: datetime.strptime(text_date, '%m/%d/%Y'),
        DateType())

typed_df = formatted_df.withColumn("date", string_to_date(formatted_df.date_string))
typed_df.show()
typed_df.printSchema()

+---+-----------+------+-------------------+-----------+----------+
| id|customer_id|amount|       purchased_at|date_string|      date|
+---+-----------+------+-------------------+-----------+----------+
|  1|          1|    55|2017-03-01 09:00:00| 03/01/2017|2017-03-01|
|  2|          1|   125|2017-03-01 10:00:00| 03/01/2017|2017-03-01|
|  3|          1|    32|2017-03-02 13:00:00| 03/02/2017|2017-03-02|
|  4|          1|    64|2017-03-02 15:00:00| 03/02/2017|2017-03-02|
|  5|          1|   128|2017-03-03 10:00:00| 03/03/2017|2017-03-03|
|  6|          2|   333|2017-03-01 09:00:00| 03/01/2017|2017-03-01|
|  7|          2|   334|2017-03-01 09:01:00| 03/01/2017|2017-03-01|
|  8|          2|   333|2017-03-01 09:02:00| 03/01/2017|2017-03-01|
|  9|          2|    11|2017-03-03 20:00:00| 03/03/2017|2017-03-03|
| 10|          2|    44|2017-03-03 20:15:00| 03/03/2017|2017-03-03|
+---+-----------+------+-------------------+-----------+----------+

root
 |-- id: integer (nullable = true)
 |-- cu

Para sumar los datos podemos utilizar el concepto común de agrupamiento en SQL. Spark posee una agrupación de groupBy, directamente. En este caso, queremos sumar todas las compras por cliente y día:

In [6]:
sum_df = typed_df.groupBy("customer_id", "date").sum()
sum_df.show()

+-----------+----------+-------+----------------+-----------+
|customer_id|      date|sum(id)|sum(customer_id)|sum(amount)|
+-----------+----------+-------+----------------+-----------+
|          2|2017-03-01|     21|               6|       1000|
|          1|2017-03-02|      7|               2|         96|
|          1|2017-03-03|      5|               1|        128|
|          1|2017-03-01|      3|               2|        180|
|          2|2017-03-03|     19|               4|         55|
+-----------+----------+-------+----------------+-----------+



Es posible dar a las columnas un nombre más amigable con los consumidores de los datos.

In [7]:
stats_df = \
    sum_df.select(
        col('customer_id'),
        col('date'),
        col('sum(amount)').alias('amount'))

stats_df.printSchema()
stats_df.show()

root
 |-- customer_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: long (nullable = true)

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|          2|2017-03-01|  1000|
|          1|2017-03-02|    96|
|          1|2017-03-03|   128|
|          1|2017-03-01|   180|
|          2|2017-03-03|    55|
+-----------+----------+------+



A continuación se muestra como cargar datos de un archivo CSV que contiene los nombres de los clientes, así como la moneda en que realizan transacciones. Nótese que el CSV no tiene información de tipos de datos, por lo que es buena práctica agregarlos explícitamente.

In [9]:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

names_df = spark \
    .read \
    .format("csv") \
    .option("path", "names.csv") \
    .option("header", True) \
    .schema(StructType([
                StructField("id", IntegerType()),
                StructField("name", StringType()),
                StructField("currency", StringType())])) \
    .load()

names_df.printSchema()
names_df.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- currency: string (nullable = true)

+---+----+--------+
| id|name|currency|
+---+----+--------+
|  1|John|     CRC|
|  2|Jane|     EUR|
+---+----+--------+



Una vez que la información fue cargada, la fuente particular no es relevante. A continuación se muestra como podemos enriquerecer la información utilizando la funcion JOIN entre data frames.

In [10]:
joint_df = stats_df.join(names_df, stats_df.customer_id == names_df.id)
joint_df.printSchema()
joint_df.show()

root
 |-- customer_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: long (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- currency: string (nullable = true)

+-----------+----------+------+---+----+--------+
|customer_id|      date|amount| id|name|currency|
+-----------+----------+------+---+----+--------+
|          2|2017-03-01|  1000|  2|Jane|     EUR|
|          1|2017-03-02|    96|  1|John|     CRC|
|          1|2017-03-03|   128|  1|John|     CRC|
|          1|2017-03-01|   180|  1|John|     CRC|
|          2|2017-03-03|    55|  2|Jane|     EUR|
+-----------+----------+------+---+----+--------+

