![logo](logo.png "Logo")

# Módulo 4 - Big Data

Autor: Juan Esquivel

# Objetivos

## General
Entender y aplicar técnicas de análisis de grandes cantidades de datos para la resolución de problemas concretos a través de tecnologías de manipulación, extracción y sintetización estadística.

## Específicos
- Aplicar bibliotecas para la transformación de datos a gran escala para poder sintetizar el conocimiento para futuro análisis.
- Aplicar técnicas de análisis de datos para extraer patrones que mejoren el entendimiento de un problema concreto.
- Aplicar técnicas para aprendizaje automatizado de patrones, basado en datos existentes, para mejorar la certeza de la solución aplicada a problemas concretos.


# Contenidos

- Introducción a procesamiento a gran escala
- Fuentes y repositorios de datos
- Procesamiento de fuentes (data frames)
- Procesamiento de atributos
- Organización de datos procesados
- Análisis de datos a gran escala
- Uso de modelos de aprendizaje automático a gran escala

# Evaluación

- Tareas Cortas 70%
- Proyecto 30%

# Procesamiento de datos a gran escala
Hoy día es relativamente sencillo tener cantidades muy considerables de datos en una organización, sin tan siquiera darse cuenta. El almacenamiento, local o en la nube, ha llegado a un punto que nos permite simplemente almacenar más y más datos sin tener que preocuparse por costos de almacenamiento. Cualquier sistema de ventas, por ejemplo, almacena información diaria sobre clientes, transacciones, tiendas, productos, etc. Después de un par de años de acumular datos, un programa que *funcionaba bien* de pronto empieza a sufrir efectos de degradación de servicio, debido al crecimiento orgánico de los datos.

Aún más, es más común que no sea factible tener todos los datos relacionados a un problema en una sola máquina física. Por ello, es necesario particionar los datos y distribuirlos en granjas de servidores de almacenamiento. Algunas definiciones colocan la línea limítrofe de Big Data utilizando esa barrera de almacenamiento distribuído.

## Lectura y transformación de datos a gran escala
El mayor tamaño de las fuentes de datos requiere que, para obtener información estratégica, se deba implementar elementos de software que permitan procesar y resumir los datos. Normalmente se refiere a ésto como *ETL: Extract, Transform and Load*.

Primero, los datos deben *Extraerse* de las fuentes. Después, deben ser *Transformados*, ya sea para crear datos más complejos o ajustar para que sea compatible con el destino. Por último, los datos deben ser *Cargados* a un medio donde serán consumidos por los usuarios.

A lo largo de la última década, la necesidad de administrar procesos *ETL* a gran escala se ha convertido en un imperativo para un científico de datos. Esto se realiza, comúnmente, utilizando *frameworks* de alta escalabilidad basados en un modelo de programación llamado *MapReduce*. En esencia, la idea es no pensar sobre los datos como conjuntos enormes de información, sino reducir las operaciones a qué debe realizarse "fila por fila". Queremos aplicar operaciones individuales a cada elemento, independiente de cualquier otro en la colección (al menos en teoría). Las operaciones que modifican cada elemento se llaman funciones *map*. Adicionalmente, podemos generar operaciones que sinteticen múltiples elementos que corresponden a la misma entidad reducidendólos a un solo elemento (*reduce*). Plataformas modernas ya no se apegan al espíritu exacto de *MapReduce*, sin embargo, las ideas básicas se mantienen igual.

En las siguiente sección introducimos los conceptos clásicos de *MapReduce*. Posteriormente, realizaremos una introducción práctica a frameworks contemporáneos. En este módulo utilizaremos Apache Spark para mostrar los conceptos básicos de este tipo de procesamiento. 

# Motivación MapReduce
El problema resuelto por *MapReduce* se ilustra con el ejemplo clásico de la frecuencia de palabras en una colección abundante de texto. Si tenemos un corpus compuesto por millones de libros de texto, podríamos pensar en un algoritmo de fuerza bruta que representa cada libro como un arreglo de tipo `string`:
```
func count(words []string) {
  var counts map[string]int
  
  for _, word:=range words {
    counts[word] += 1
  }
  printSorted(counts)
}
```

En Go el anterior ejemplo puede alcanzar órdenes de magnitudes de centenas de millones, convirtiéndose la memoria en el cuello de botella primario. Es posible que el procesamiento no requiera todas las palabras en memoria, utilizando canales de transmisión (`channel` en Go). Sin embargo, cuando todas las palabras son únicas aún tenemos problemas.

Si dejamos los linderos de una máquina individual y asumimos que tenemos, por ejemplo, 10 núcleos de procesamiento. Podríamos dividir el total de los datos en 10 partes, cada núcleo puede generar conteos para cada segmento de datos y, posteriormente, lo envía a un controlador. El controlador procedería a integrar todos los conteos.

Si queremos repetir este proceso con 1000 máquinas, un solo controlador colapsaría. Podemos agregar una jerarquía de controladores, sin embargo, donde cada 10 máquina envía a un controlador intermedio y esos, a su vez, a un controlador mayor.

Cuando se trabaja con un número tan elevado de núcleos de procesamiento, la probabilidad de que alguna máquina presente errores se incrementa. Si definimos el error que una sola máquina falle como $\epsilon=0.001$ podemos definir la probabilidad que 10 máquinas ejecuten el proceso sin tener errores como $(1-\epsilon)^{10}=0.999^{10}=0.9900448802$. Esto quiere decir que en más del 99% de las ejecuciones no debería existir errores en las máquinas y, por lo tanto, la tarea debería concluirse exitosamente. El mismo razonamiento para 1000 máquinas, sin embargo, nos da una probabilidad que todos los núcleos sean exitosos de 37%, únicamente ($(1-\epsilon)^{1000}=0.999^{1000}=0.3676954248$.

Esto quiere deir que el modelado debe incorporar tolerancia a fallos integralemente, replicando las entradas (3 copias), distribuyendo el mismo trabajo entre máquinas no relacionadas entre sí, utilización de checksums, etc.

## Detalles de MapReduce
De acuerdo a su publicación original *MapReduce* es un modelo de programación para procesar y generar grandes conjuntos de datos. Todo el procesamiento se basa en expresar los datos en pares *llave/valor*. Se debe definir una función de mapeo de los pares originales a una representación intermedia que, posteriormente, es tomada por una función de reducción que une todas las llaves en una sola entrada. Existe una fase oculta a los usuarios que ordena los datos (*shuffle*) previo a la reducción. Si se tienen $R$ máquinas a cargo de la reducción, la fase de shuffle asigna los datos intermedios al nodo con índice `hash(key) % R`.

En gran medida, la popularidad de este modelo es que una gran cantidad de problemas reales se pueden modelar de esta forma. Además, los autores crearon un ambiente de ejecución donde los programas escritos en esta forma eran paralelizados y ejecutados de forma distribuida de manera automática. 

Para el ejemplo del conteo de frecuencias de palabras, el algoritmo para resolverlo sería:

```
map(String key, String[] value):
  // key: document name
  // value: document contents
  for each word in value:
    EmitIntermediate(w, "1")

reduce(String key, Iterator values):
  // key: a word
  // values: a list of counts
  int result = 0;
  for each v in values:
    result += ParseInt(v);
  Emit(AsString(result));
```

El siguiente diagrama muestra el proceso completo que efectúa el ambiente de ejecución de *MapReduce*.

![mapreduce](mr.png "Map Reduce")

## Limitaciones de MapReduce
Como se mencionó anteriormente, los problemas resueltos por MapReduce *deben* expresarse como una colección de elementos *llave/valor*. De no ser posible, no se puede utilizar el framework para bordarlos.

Además, tiene problemas con algoritmos que son iterativos, debido a que en la fase del mapeo se asume que cada elemento es independiente de otros.



# Datos de ejemplo
A manera de ejemplo, asumiremos que una tienda en línea almacena las compras de sus clientes en una base de datos PostgreSQL. Aplicaremos ciertas operaciones básicas sobre los datos para mostrar lo que este tipo de plataformas ofrecen. A continuación, un ejemplo básico de modelo de datos, que se usará posteriormente.

```sql
CREATE TABLE transactions (
  id SERIAL PRIMARY KEY,
  customer_id integer NOT NULL,
  amount integer NOT NULL,
  purchased_at timestamp without time zone NOT NULL
);


INSERT INTO "transactions" (customer_id, amount, purchased_at) VALUES
(1, 55, '2017-03-01 09:00:00'),
(1, 125, '2017-03-01 10:00:00'),
(1, 32, '2017-03-02 13:00:00'),
(1, 64, '2017-03-02 15:00:00'),
(1, 128, '2017-03-03 10:00:00'),
(2, 333, '2017-03-01 09:00:00'),
(2, 334, '2017-03-01 09:01:00'),
(2, 333, '2017-03-01 09:02:00'),
(2, 11, '2017-03-03 20:00:00'),
(2, 44, '2017-03-03 20:15:00');
```

# Apache Spark

Existen multiples *frameworks* para procesar datos a gran escala. Para el propósito de esta clase, utilizaremos Apache Spark ya que es un proyecto abierto que ha sido adoptado por una gran candidad de desarrolladores y también es un motor primario en servicios en la nube, como Amazon Web Services. 

Spark está basado en una abstracción llamada *Resilient Distributed Dataset* (RDD) que es una colección de elementos que pueden ser almacenados, temporal o permanentemente, a lo largo de múltuples nodos físicos de un cluster. Spark permite el uso de múltiples fuentes de datos, desde SQL y NoSQL hasta archivos de texto plano. La tarea primaria de un programador es diseñar las operaciones sucesivas par transformar el RDD de su forma original a la salida deseada.

El siguiente ejemplo de código utiliza la base de datos descrita anteriormente y muestra el contenido almacenado en ella. Nótese que si la tabla fuera muy grande llamar al método `show()` no es recomendable.

In [1]:
import findspark
findspark.init('C:/tools/Spark/spark-2.4.4-bin-hadoop2.7')

In [2]:
import pyspark

from datetime import datetime
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, udf 
from pyspark.sql.types import DateType

spark = SparkSession.builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.driver.extraClassPath", "C:/tools/postgresql_jdbc/postgresql-42.2.9.jar") \
    .getOrCreate()

In [3]:
# Reading single DataFrame in Spark by retrieving all rows from a DB table.
df = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost/Tarea3") \
    .option("user", "postgres") \
    .option("password", "Nalia07@") \
    .option("dbtable", "transactions") \
    .load()


In [4]:
df.show()

+---+-----------+------+-------------------+
| id|customer_id|amount|       purchased_at|
+---+-----------+------+-------------------+
|  2|          1|    55|2017-03-01 09:00:00|
|  3|          1|   125|2017-03-01 10:00:00|
|  4|          1|    32|2017-03-02 13:00:00|
|  5|          1|    64|2017-03-02 15:00:00|
|  6|          1|   128|2017-03-03 10:00:00|
|  7|          2|   333|2017-03-01 09:00:00|
|  8|          2|   334|2017-03-01 09:01:00|
|  9|          2|   333|2017-03-01 09:02:00|
| 10|          2|    11|2017-03-03 20:00:00|
| 11|          2|    44|2017-03-03 20:15:00|
+---+-----------+------+-------------------+



Leer datos directamente rara vez es el objetivo primario. Las dos tareas típicas son: transformar algunas de las columnas en representaciones modificadas de las mismas, o bien, agregar grupos de filas en una. A manera de ejemplo, podemos asumir que se quiere la información del total de compras hechas por clientes.

Spark provee ciertas transformaciones básicas. Por ejemplo, transformar fechas a una representación con formato específico, para poder truncar la una estampilla de tiempo a nivel de día. En este caso, se utiliza `date_format` en el módulo `pyspark.sql.functions`. El siguiente código crea una columna nueva después de aplicar la transformación.

In [5]:
formatted_df = df.withColumn("date_string", date_format(col("purchased_at"), 'MM/dd/yyyy'))
formatted_df.show()

+---+-----------+------+-------------------+-----------+
| id|customer_id|amount|       purchased_at|date_string|
+---+-----------+------+-------------------+-----------+
|  2|          1|    55|2017-03-01 09:00:00| 03/01/2017|
|  3|          1|   125|2017-03-01 10:00:00| 03/01/2017|
|  4|          1|    32|2017-03-02 13:00:00| 03/02/2017|
|  5|          1|    64|2017-03-02 15:00:00| 03/02/2017|
|  6|          1|   128|2017-03-03 10:00:00| 03/03/2017|
|  7|          2|   333|2017-03-01 09:00:00| 03/01/2017|
|  8|          2|   334|2017-03-01 09:01:00| 03/01/2017|
|  9|          2|   333|2017-03-01 09:02:00| 03/01/2017|
| 10|          2|    11|2017-03-03 20:00:00| 03/03/2017|
| 11|          2|    44|2017-03-03 20:15:00| 03/03/2017|
+---+-----------+------+-------------------+-----------+



En el caso que necesitemos crear una función que no es parte de la biblioteca estándar en Spark, es posible definir funciones creadas por el usuario (User Defined Functions o *udf*). La noción básica de una `udf` en Spark un lambda acompañado por el tipo de dato retornado. Lo anterior es estrictamente necesario en lenguajes con un sistema de tipos débil.

Además, es una manera cómoda de encapsular funciones de Python que deben ser aplicadas a celdas del *Dataframe*, pero aún no tienen su implementación en las bibliotecas.

El siguiente ejemplo muestra la columna creada previamente (tipo `string`) y la transforma en un tipo `DateType` propio de Spark.

In [6]:
string_to_date = \
    udf(lambda text_date: datetime.strptime(text_date, '%m/%d/%Y'),
        DateType())

typed_df = formatted_df.withColumn("date", string_to_date(formatted_df.date_string))
typed_df.show()
typed_df.printSchema()


+---+-----------+------+-------------------+-----------+----------+
| id|customer_id|amount|       purchased_at|date_string|      date|
+---+-----------+------+-------------------+-----------+----------+
|  2|          1|    55|2017-03-01 09:00:00| 03/01/2017|2017-03-01|
|  3|          1|   125|2017-03-01 10:00:00| 03/01/2017|2017-03-01|
|  4|          1|    32|2017-03-02 13:00:00| 03/02/2017|2017-03-02|
|  5|          1|    64|2017-03-02 15:00:00| 03/02/2017|2017-03-02|
|  6|          1|   128|2017-03-03 10:00:00| 03/03/2017|2017-03-03|
|  7|          2|   333|2017-03-01 09:00:00| 03/01/2017|2017-03-01|
|  8|          2|   334|2017-03-01 09:01:00| 03/01/2017|2017-03-01|
|  9|          2|   333|2017-03-01 09:02:00| 03/01/2017|2017-03-01|
| 10|          2|    11|2017-03-03 20:00:00| 03/03/2017|2017-03-03|
| 11|          2|    44|2017-03-03 20:15:00| 03/03/2017|2017-03-03|
+---+-----------+------+-------------------+-----------+----------+

root
 |-- id: integer (nullable = true)
 |-- cu

Para sumar los datos podemos utilizar el concepto común de agrupamiento en SQL. Spark posee una agrupación de `groupBy`, directamente. En este caso, queremos sumar todas las compras por cliente y día:

In [7]:
sum_df = typed_df.groupBy("customer_id", "date").sum()
sum_df.show()

+-----------+----------+-------+----------------+-----------+
|customer_id|      date|sum(id)|sum(customer_id)|sum(amount)|
+-----------+----------+-------+----------------+-----------+
|          2|2017-03-01|     24|               6|       1000|
|          1|2017-03-02|      9|               2|         96|
|          1|2017-03-03|      6|               1|        128|
|          1|2017-03-01|      5|               2|        180|
|          2|2017-03-03|     21|               4|         55|
+-----------+----------+-------+----------------+-----------+



Spark sumará todas las columnas **que no se encuentren** especificadas en la operación `groupBy`. Algunos resultados no tienen interpretación últil. Por ejemplo, sumar la columna `customer_id` no da ningún valor agregado. Finalmente, es posible dar a las columnas un nombre más amigable con los consumidores de los datos.

In [8]:
stats_df = \
    sum_df.select(
        col('customer_id'),
        col('date'),
        col('sum(amount)').alias('amount'))

stats_df.printSchema()
stats_df.show()


root
 |-- customer_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: long (nullable = true)

+-----------+----------+------+
|customer_id|      date|amount|
+-----------+----------+------+
|          2|2017-03-01|  1000|
|          1|2017-03-02|    96|
|          1|2017-03-03|   128|
|          1|2017-03-01|   180|
|          2|2017-03-03|    55|
+-----------+----------+------+



Spark permite cargar información de múltiples fuentes. A continuación se muestra como cargar datos de un archivo CSV que contiene los nombres de los clientes, así como la moneda en que realizan transacciones. Nótese que el CSV no tiene información de tipos de datos, por lo que es buena práctica agregarlos explícitamente.

In [11]:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

names_df = spark \
    .read \
    .format("csv") \
    .option("path", "names.csv") \
    .option("header", True) \
    .schema(StructType([
                StructField("id", IntegerType()),
                StructField("name", StringType()),
                StructField("currency", StringType())])) \
    .load()

names_df.printSchema()
names_df.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- currency: string (nullable = true)

+---+----+--------+
| id|name|currency|
+---+----+--------+
|  1|John|     CRC|
|  2|Jane|     EUR|
+---+----+--------+



Una vez que la información fue cargada, la fuente particular no es relevante. A continuación se muestra como podemos enriquerecer la información utilizando la funcion **JOIN** entre *data frames*.

In [12]:
joint_df = stats_df.join(names_df, stats_df.customer_id == names_df.id)
joint_df.printSchema()
joint_df.show()

root
 |-- customer_id: integer (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: long (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- currency: string (nullable = true)

+-----------+----------+------+---+----+--------+
|customer_id|      date|amount| id|name|currency|
+-----------+----------+------+---+----+--------+
|          2|2017-03-01|  1000|  2|Jane|     EUR|
|          1|2017-03-02|    96|  1|John|     CRC|
|          1|2017-03-03|   128|  1|John|     CRC|
|          1|2017-03-01|   180|  1|John|     CRC|
|          2|2017-03-03|    55|  2|Jane|     EUR|
+-----------+----------+------+---+----+--------+



# Ejercicio
- Cree un pequeño generador de transacciones. Para ello, puede utilizar funciones de numpy o pandas, para crear las transacciones nuevas en memoria y, posteriormente, puede cargarlas a un *Spark Dataframe* que después debe insertar en la base de datos.
- Con los datos nuevos, ejecute el código de éste notebook.

# Configuración
Para ejecutar el código de ejemplo en este notebook es necesario instalar Jupyter localmente. con soporte de Python. También es necesario instalar una serie de módulos utilizando el comando `pip`:

```bash
pip3 install pyspark
pip3 install findspark
```

Referencias adicionales: [Spark](https://spark.apache.org/) y [PostgreSQL](https://www.postgresql.org/).

# Referencias
- Schutt, R; O'Neill C. Doing Data Science - Straight Talk from the Frontline. O'Reilly Media. 2013 (Capítulo 14)
- Dean, J; Ghemawat, S. MapReduce: Simplified Data Processing on Large Clusters. https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf

# INICIO DE LA TAREA

In [48]:
import random
from datetime import datetime, date
from random import randint, uniform
import pandas as pd
import random

In [49]:
#generado números del 1 al 5 porque incluí una tabla de clientes con códigos del 1 al 5
def get_CodigoCliente():
    return randint(0,5)

#prueba de generación
#for i in range(10):
#    print(get_CodigoCliente())

In [58]:
#generador de montos de compra que van desde 1,900 hasta 850,000 con dos decimales
def get_MontoCompra():
    return  round (uniform(1900,850000),2)

#prueba de generación
#for i in range(10):
#    print(get_MontoCompra())

In [127]:
#genera fechas aleatorias, defini 2000 como el año inicial, la fecha se requiere como timestamp para que sea compatible
#con los datos de la tabla. La lógica de generación la tome de stackoverflow
def get_FechaAleatoria():     
    AMin=2000
    AMax=datetime.now().year
    Inicio = datetime(AMin, 1, 1, 00, 00, 00)
    AA = AMax - AMin + 1
    Fin = Inicio + timedelta(days=365 * AA)
    return Inicio + (Fin - Inicio) * random.random()
    

#Prueba de generación de fechas
#for i in range(10):
#    print(gen_datetime())

### Generación de datos para incluir en base de datos

In [128]:
#generar los datos en un dataframe, por medio de las funciones creadas
DatosGenerados = pd.DataFrame()

for i in range(250):
    DatosGenerados = DatosGenerados.append({"customer_id": get_CodigoCliente(), 
                    "amount": get_MontoCompra(), 
                    "purchased_at": get_FechaAleatoria()}, ignore_index=True)
    
#imprimir los primeros 20 registros
DatosGenerados.head(20)

Unnamed: 0,amount,customer_id,purchased_at
0,350955.27,3.0,2015-07-28 09:30:59.240141
1,46599.88,5.0,2010-09-04 09:33:43.629924
2,295802.33,3.0,2002-03-18 02:01:56.168931
3,75583.57,3.0,2002-02-08 18:28:37.994148
4,101572.16,1.0,2008-01-14 02:50:38.915847
5,130123.59,3.0,2007-09-03 20:56:35.710260
6,108131.73,1.0,2004-03-20 10:19:41.031294
7,723547.06,3.0,2015-01-14 06:20:41.697110
8,167834.69,1.0,2019-06-09 07:17:58.411117
9,369487.5,0.0,2010-09-14 18:08:34.531931


### DEFINICION DEL AMBIENTE PYSPARK 

In [129]:
import findspark
findspark.init('C:/tools/Spark/spark-2.4.4-bin-hadoop2.7')
from pyspark.sql.types import *

In [130]:
import pyspark

from datetime import datetime
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, udf 
from pyspark.sql.types import DateType

spark2 = SparkSession.builder \
    .appName("Basic JDBC pipeline") \
    .config("spark.driver.extraClassPath", "C:/tools/postgresql_jdbc/postgresql-42.2.9.jar") \
    .getOrCreate()

#### Creación del esquema para pandas 

In [131]:
#Primero creo la estructura que tiene la tabla, por lo tanto debe ser la misma del dataframe

EsquemaTransacciones = StructType([ StructField("amount", FloatType(), True), \
                        StructField("customer_id", FloatType(), True), \
                        StructField("purchased_at", TimestampType(), True)])

spark2 = SparkSession.builder.appName('AppPandasASpark').getOrCreate()

DatosSpark = spark2.createDataFrame(DatosGenerados, schema=EsquemaTransacciones)
DatosSpark.show()

+---------+-----------+--------------------+
|   amount|customer_id|        purchased_at|
+---------+-----------+--------------------+
|350955.28|        3.0|2015-07-28 09:30:...|
| 46599.88|        5.0|2010-09-04 09:33:...|
|295802.34|        3.0|2002-03-18 02:01:...|
| 75583.57|        3.0|2002-02-08 18:28:...|
|101572.16|        1.0|2008-01-14 02:50:...|
|130123.59|        3.0|2007-09-03 20:56:...|
|108131.73|        1.0|2004-03-20 10:19:...|
|723547.06|        3.0|2015-01-14 06:20:...|
|167834.69|        1.0|2019-06-09 07:17:...|
| 369487.5|        0.0|2010-09-14 18:08:...|
| 819431.6|        3.0|2013-04-11 02:18:...|
|270850.75|        5.0|2019-03-15 14:20:...|
| 80700.16|        3.0|2006-09-04 22:09:...|
|847249.06|        1.0|2017-07-28 07:17:...|
| 584492.9|        0.0|2001-02-18 06:49:...|
| 705096.5|        1.0|2016-11-08 03:37:...|
|170827.55|        3.0|2002-09-07 10:54:...|
|375178.16|        3.0|2006-08-08 14:39:...|
|437955.38|        5.0|2010-06-08 23:48:...|
|182986.69

In [134]:
DatosSpark.write.mode("overwrite") \
   .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost/Tarea3") \
    .option("user", "postgres") \
    .option("password", "Nalia07@") \
    .option("dbtable", "transactions") \
    .save()

In [135]:
# Reading single DataFrame in Spark by retrieving all rows from a DB table.
DatosNuevos = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost/Tarea3") \
    .option("user", "postgres") \
    .option("password", "Nalia07@") \
    .option("dbtable", "transactions") \
    .load()

DatosNuevos.show()

+---------+-----------+--------------------+
|   amount|customer_id|        purchased_at|
+---------+-----------+--------------------+
|350955.28|        3.0|2015-07-28 09:30:...|
| 46599.88|        5.0|2010-09-04 09:33:...|
|295802.34|        3.0|2002-03-18 02:01:...|
| 75583.57|        3.0|2002-02-08 18:28:...|
|101572.16|        1.0|2008-01-14 02:50:...|
|130123.59|        3.0|2007-09-03 20:56:...|
|108131.73|        1.0|2004-03-20 10:19:...|
|723547.06|        3.0|2015-01-14 06:20:...|
|167834.69|        1.0|2019-06-09 07:17:...|
| 369487.5|        0.0|2010-09-14 18:08:...|
| 819431.6|        3.0|2013-04-11 02:18:...|
|270850.75|        5.0|2019-03-15 14:20:...|
| 80700.16|        3.0|2006-09-04 22:09:...|
|847249.06|        1.0|2017-07-28 07:17:...|
| 584492.9|        0.0|2001-02-18 06:49:...|
| 705096.5|        1.0|2016-11-08 03:37:...|
|170827.55|        3.0|2002-09-07 10:54:...|
|375178.16|        3.0|2006-08-08 14:39:...|
|437955.38|        5.0|2010-06-08 23:48:...|
|182986.69

### No voy a repetir el texto completo del notebook, porque esta arriba, lo voy a resumir.

Transformar fechas a una representación con formato específico, para poder truncar la una estampilla de tiempo a nivel de día. En este caso, se utiliza date_format en el módulo pyspark.sql.functions. El siguiente código crea una columna nueva después de aplicar la transformación.

In [136]:
formatted_DatosNuevos = DatosNuevos.withColumn("date_string", date_format(col("purchased_at"), 'MM/dd/yyyy'))
formatted_DatosNuevos.show()

+---------+-----------+--------------------+-----------+
|   amount|customer_id|        purchased_at|date_string|
+---------+-----------+--------------------+-----------+
|350955.28|        3.0|2015-07-28 09:30:...| 07/28/2015|
| 46599.88|        5.0|2010-09-04 09:33:...| 09/04/2010|
|295802.34|        3.0|2002-03-18 02:01:...| 03/18/2002|
| 75583.57|        3.0|2002-02-08 18:28:...| 02/08/2002|
|101572.16|        1.0|2008-01-14 02:50:...| 01/14/2008|
|130123.59|        3.0|2007-09-03 20:56:...| 09/03/2007|
|108131.73|        1.0|2004-03-20 10:19:...| 03/20/2004|
|723547.06|        3.0|2015-01-14 06:20:...| 01/14/2015|
|167834.69|        1.0|2019-06-09 07:17:...| 06/09/2019|
| 369487.5|        0.0|2010-09-14 18:08:...| 09/14/2010|
| 819431.6|        3.0|2013-04-11 02:18:...| 04/11/2013|
|270850.75|        5.0|2019-03-15 14:20:...| 03/15/2019|
| 80700.16|        3.0|2006-09-04 22:09:...| 09/04/2006|
|847249.06|        1.0|2017-07-28 07:17:...| 07/28/2017|
| 584492.9|        0.0|2001-02-

USER DEFINED FUNCTIONS o UDF 

La noción básica de una udf en Spark un lambda acompañado por el tipo de dato retornado. Lo anterior es estrictamente necesario en lenguajes con un sistema de tipos débil.

El siguiente ejemplo muestra la columna creada previamente (tipo string) y la transforma en un tipo DateType propio de Spark.

In [138]:
string_to_date = \
    udf(lambda text_date: datetime.strptime(text_date, '%m/%d/%Y'),
        DateType())

typed_DatosNuevos = formatted_DatosNuevos.withColumn("date", string_to_date(formatted_DatosNuevos.date_string))
typed_DatosNuevos.show()
typed_DatosNuevos.printSchema()

+---------+-----------+--------------------+-----------+----------+
|   amount|customer_id|        purchased_at|date_string|      date|
+---------+-----------+--------------------+-----------+----------+
|350955.28|        3.0|2015-07-28 09:30:...| 07/28/2015|2015-07-28|
| 46599.88|        5.0|2010-09-04 09:33:...| 09/04/2010|2010-09-04|
|295802.34|        3.0|2002-03-18 02:01:...| 03/18/2002|2002-03-18|
| 75583.57|        3.0|2002-02-08 18:28:...| 02/08/2002|2002-02-08|
|101572.16|        1.0|2008-01-14 02:50:...| 01/14/2008|2008-01-14|
|130123.59|        3.0|2007-09-03 20:56:...| 09/03/2007|2007-09-03|
|108131.73|        1.0|2004-03-20 10:19:...| 03/20/2004|2004-03-20|
|723547.06|        3.0|2015-01-14 06:20:...| 01/14/2015|2015-01-14|
|167834.69|        1.0|2019-06-09 07:17:...| 06/09/2019|2019-06-09|
| 369487.5|        0.0|2010-09-14 18:08:...| 09/14/2010|2010-09-14|
| 819431.6|        3.0|2013-04-11 02:18:...| 04/11/2013|2013-04-11|
|270850.75|        5.0|2019-03-15 14:20:...| 03/

Para sumar los datos podemos utilizar el concepto común de agrupamiento en SQL. Spark posee una agrupación de groupBy, directamente. En este caso, queremos sumar todas las compras por cliente y día:



In [140]:
sum_DatosNuevos = typed_DatosNuevos.groupBy("customer_id", "date").sum()
sum_DatosNuevos.show()

+-----------+----------+--------------+----------------+
|customer_id|      date|   sum(amount)|sum(customer_id)|
+-----------+----------+--------------+----------------+
|        3.0|2006-08-08|  375178.15625|             3.0|
|        0.0|2001-02-18|    584492.875|             0.0|
|        5.0|2014-10-29|  228326.34375|             5.0|
|        2.0|2009-03-22|   717238.0625|             2.0|
|        2.0|2009-02-23|   466119.0625|             2.0|
|        5.0|2002-02-18| 179576.296875|             5.0|
|        5.0|2014-02-16|   421361.6875|             5.0|
|        1.0|2018-04-29|  309794.40625|             1.0|
|        1.0|2007-06-21|35489.51171875|             1.0|
|        0.0|2013-03-03| 210485.953125|             0.0|
|        1.0|2018-01-07|    264014.875|             1.0|
|        4.0|2014-05-16|   69801.71875|             4.0|
|        2.0|2003-05-25|   179707.3125|             2.0|
|        0.0|2001-12-06|  408123.59375|             0.0|
|        3.0|2014-12-18|   6158

Spark sumará todas las columnas que no se encuentren especificadas en la operación groupBy. Algunos resultados no tienen interpretación últil. Por ejemplo, sumar la columna customer_id no da ningún valor agregado. Finalmente, es posible dar a las columnas un nombre más amigable con los consumidores de los datos.

In [141]:
stats_DatosNuevos = \
    sum_DatosNuevos.select(
        col('customer_id'),
        col('date'),
        col('sum(amount)').alias('amount'))

stats_DatosNuevos.printSchema()
stats_DatosNuevos.show()

root
 |-- customer_id: float (nullable = true)
 |-- date: date (nullable = true)
 |-- amount: double (nullable = true)

+-----------+----------+--------------+
|customer_id|      date|        amount|
+-----------+----------+--------------+
|        3.0|2006-08-08|  375178.15625|
|        0.0|2001-02-18|    584492.875|
|        5.0|2014-10-29|  228326.34375|
|        2.0|2009-03-22|   717238.0625|
|        2.0|2009-02-23|   466119.0625|
|        5.0|2002-02-18| 179576.296875|
|        5.0|2014-02-16|   421361.6875|
|        1.0|2018-04-29|  309794.40625|
|        1.0|2007-06-21|35489.51171875|
|        0.0|2013-03-03| 210485.953125|
|        1.0|2018-01-07|    264014.875|
|        4.0|2014-05-16|   69801.71875|
|        2.0|2003-05-25|   179707.3125|
|        0.0|2001-12-06|  408123.59375|
|        3.0|2014-12-18|   615805.9375|
|        0.0|2010-09-14|     1018428.5|
|        3.0|2011-02-01| 253001.015625|
|        2.0|2003-06-19|     582520.25|
|        3.0|2015-01-14|   723547.0625|


Spark permite cargar información de múltiples fuentes. A continuación se muestra como cargar datos de un archivo CSV que contiene los nombres de los clientes, así como la moneda en que realizan transacciones. Nótese que el CSV no tiene información de tipos de datos, por lo que es buena práctica agregarlos explícitamente.

In [142]:
from pyspark.sql.types import IntegerType, StringType, StructField, StructType

names_df = spark \
    .read \
    .format("csv") \
    .option("path", "names.csv") \
    .option("header", True) \
    .schema(StructType([
                StructField("id", IntegerType()),
                StructField("name", StringType()),
                StructField("currency", StringType())])) \
    .load()

names_df.printSchema()
names_df.show()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- currency: string (nullable = true)

+---+----+--------+
| id|name|currency|
+---+----+--------+
|  1|John|     CRC|
|  2|Jane|     EUR|
+---+----+--------+

