# Analisis inicial de datos con Spark

## Introduccion

Para la entrega del diseño inicial debemos tener explicatados en el documentos los siguiente pasos:

1- Clasificar el problema. Que tipo de problema tenemos? Regresion o clasificacion? Por que? <br>

2- Tenemos los datos. <br>
2.1 Hay que depurarlos. <br>
2.2 Detectar y eliminar anomalias <br>
2.3 Valores faltantes para ciertos atributos <br>

3- Explorar datos (no se procesan) <br>
3.1 Analizar la estructura <br>
3.2 Navegar datos <br>
3.3 Resumen estadistico. <br>
3.4 Visualizar datos <br>

4- Aplicar a los datos el algoritmo necesario <br>

5- Comunicar resultados <br>

Una de las herramientas que aprendimos y nos resulta sencilla es el Map-Reduce de Spark.<br>
En el caso de necesitar graficos podemos pedirle al Spark Context que nos devuelva los datos y luego se los pasamos a una libreria de ploteo de Python3.


## Desarrollo

In [6]:
import pyspark
from pyspark.sql import SQLContext

try: 
    type(sc)
except NameError:
    sc = pyspark.SparkContext('local[*]')    
sqlCtx = SQLContext(sc)

In [10]:
import pyspark_csv as pycsv
sc.addPyFile('pyspark_csv.py')
plaintext_rdd = sc.textFile('data/train.csv')
dataframe = pycsv.csvToDataFrame(sqlCtx, plaintext_rdd, parseDate=False)

data = dataframe.rdd

## Productos diferentes

In [16]:
products = data.map(lambda row: row.ProductId).distinct()
products.count()

67726

## 20 productos con mas reviews

In [18]:
data.map(lambda row: (row.ProductId, 1)).reduceByKey(lambda a, b: a+b).takeOrdered(20, key = lambda x: -x[1])

[('B007JFMH8M', 714),
 ('B003B3OOPA', 511),
 ('B0026RQTGE', 509),
 ('B002QWP8H0', 508),
 ('B002QWP89S', 499),
 ('B002QWHJOU', 493),
 ('B0026KNQSA', 468),
 ('B001RVFEP2', 456),
 ('B007M83302', 454),
 ('B0013NUGDE', 451),
 ('B006HYLW32', 450),
 ('B001EO5Q64', 449),
 ('B000VK8AVK', 449),
 ('B0026KPDG8', 448),
 ('B007M832YY', 444),
 ('B000KV7ZGQ', 441),
 ('B005K4Q1YA', 440),
 ('B005K4Q4LK', 435),
 ('B000KV61FC', 432),
 ('B000NMJWZO', 432)]

## 20 productos con mayor calificaciones

In [21]:
data.map(lambda row: (row.ProductId, (row.Prediction, 1)))\
    .reduceByKey(lambda a,b: (a[0]+b[0],a[1]+b[1]))\
    .takeOrdered(20, key = lambda x: -(x[1][0]*100/x[1][1]))

[('B003XEG95U', (5, 1)),
 ('B004W0W264', (15, 3)),
 ('B007Y4D6W4', (5, 1)),
 ('B0000W2T0C', (5, 1)),
 ('B001BYEHBW', (5, 1)),
 ('B001E52ZQW', (15, 3)),
 ('B000F1Z66M', (5, 1)),
 ('B003PZY08U', (5, 1)),
 ('B0009BLNSS', (5, 1)),
 ('B002I7OP0E', (5, 1)),
 ('B0027E2MT4', (10, 2)),
 ('B007KWX7QK', (5, 1)),
 ('B000LKX9D4', (5, 1)),
 ('B002LFGNXA', (5, 1)),
 ('B003EI7V8O', (10, 2)),
 ('B001SB4IOY', (15, 3)),
 ('B000V1JU7M', (10, 2)),
 ('B001I4EDGO', (5, 1)),
 ('B007ZZQTS0', (5, 1)),
 ('B003WRLNO0', (5, 1))]

In [None]:
data.flatMap(lambda row: (row.Text.split())).map(lambda row: (row, 1)).reduceByKey(lambda a, b: a+b).takeOrdered(20, key = lambda x: -x[1])