# Apache Spark com Python
#### Garoa Data Science 20171018

Variável de Contexto (funciona como acessor para o cluster Spark)

In [None]:
sc

## Criando RDDs

In [None]:
my_data = [1, 2, 3, 4]

In [None]:
my_rdd = sc.parallelize(my_data)
my_rdd

### Ações
São operações que tomam um RDD como entrada e retornam outro tipo de dado *para o programa principal*

**collect**: retorna uma lista contendo todos os elementos do RDD

In [None]:
my_rdd.collect()

**take**: retorna os n primeiros elementos do RDD

In [None]:
my_rdd.take(2)

**count**: retorna o length do RDD

In [None]:
my_rdd.count()

**reduce**: combina os elementos do RDD em paralelo

In [None]:
my_rdd.reduce(lambda acc, new: acc + new)

**foreach**: aplica uma função a cada elemento do RDD. Não retorna nada

In [None]:
def f(x):
    return x + 1
my_rdd.foreach(f)

### Transformações
São operações que *transformam* um RDD em outro. Em geral, seguem o paradigma de *lazy evaluation*.

**map**

In [None]:
my_transformed_rdd = my_rdd.map(lambda x: x**2)

In [None]:
my_rdd.collect()

In [None]:
my_transformed_rdd.collect()

**flatMap**: mesma coisa do que o map, só que retorna tudo em uma só lista

In [None]:
my_text = sc.parallelize([
            'No meio do caminho tinha uma pedra',
            'tinha uma pedra no meio do caminho',
            'tinha uma pedra',
            'no meio do caminho tinha uma pedra',
            'Nunca me esquecerei desse acontecimento',
            'na vida de minhas retinas tão fatigadas',
            'Nunca me esquecerei que no meio do caminho',
            'tinha uma pedra',
            'tinha uma pedra no meio do caminho',
            'no meio do caminho tinha uma pedra'])

In [None]:
my_text.map(lambda line: line.split()).collect()

In [None]:
my_text.flatMap(lambda line: line.split()).collect()

### Word count

In [None]:
temp = my_text.flatMap(lambda line: line.split()).map(lambda x: (x, 1)).reduceByKey(lambda x, y: x + y)

In [None]:
temp.sortBy(lambda x: x[1], ascending=False).collect()

**filter**: retorna um RDD que contém apenas os elementos que passaram por uma certa condição

In [None]:
my_rdd.filter(lambda x: x %2 == 0).collect()

**Desafio**: calcular o valor de pi usando o Método Monte Carlo em *Python puro* e Spark. Comparar os tempos de execução 

In [None]:
import numpy as np

In [None]:
n = 1000000

In [None]:
seeds = sc.parallelize(range(n))
pi_ = seeds.map(lambda seed: np.random.seed(seed) or np.random.rand(2, 10000))\
     .map(lambda pair: 4*np.mean(pair[0]**2 + pair[1]**2 <= 1))\
     .mean()
np.abs(100*(np.pi - pi_))

In [None]:
seeds = sc.parallelize(range(n))
pi_ = seeds.map(lambda seed: np.random.seed(seed) or np.random.rand(2))\
           .map(lambda pair: pair[0]**2 + pair[1]**2 <= 1)\
           .mean()
np.abs(np.pi - 4*pi_)

Outra forma de criar um RDD

In [None]:
import ibmos2spark

# @hidden_cell
credentials = {
    'auth_url': 'https://identity.open.softlayer.com',
    'project_id': '90a4103b7b014ee9b3a296b606adfd2c',
    'region': 'dallas',
    'user_id': '74e4420fd3a54fe69c15cf3deecbb553',
    'username': 'member_9e322ac0978be20f6b6d1fe9c9ca667d8354f3ff',
    'password': 'I[PVWf_MD{Dx^10#'
}

configuration_name = 'os_6a1cd4de5fda4aed9daa77a16acbd0eb_configs'
bmos = ibmos2spark.bluemix(sc, credentials, configuration_name)

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
# Please read the documentation of PySpark to learn more about the possibilities to load data files.
# PySpark documentation: https://spark.apache.org/docs/2.0.1/api/python/pyspark.sql.html#pyspark.sql.SparkSession
# The SparkSession object is already initalized for you.
# The following variable contains the path to your file on your Object Storage.
path_1 = bmos.url('DefaultProjectgabrielcasarinibmcom', 'README.md')


In [None]:
lines = sc.textFile(path_1)

In [None]:
lines.take(3)

In [None]:
lines.filter(lambda line: 'Python' in line).collect()

**Desafio**: retornar a linha juntamente com o seu índice

In [None]:
lines.zipWithIndex().filter(lambda pair: 'Python' in pair[0]).collect()

### Dataframes

In [None]:
my_df = spark.createDataFrame([('A', 1), ('B', 2), ('C', 5), ('D', 6)], ['Col1', 'Col2'])

In [None]:
my_df.columns

In [None]:
my_df.head(5)

In [None]:
df = spark.read\
  .format('org.apache.spark.sql.execution.datasources.csv.CSVFileFormat')\
  .option('header', 'true')\
  .load(bmos.url('DefaultProjectgabrielcasarinibmcom', 'iris.csv'))
df.take(5)

In [None]:
df.head(3)

In [None]:
df[df.name == 'setosa'].collect()

In [None]:
df[(df.name == 'setosa') & (df.sepal_width > 4.0)].collect()

In [None]:
import pyspark.sql.functions as fs

In [None]:
df.groupBy('name').agg(\
       fs.mean('sepal_length').alias('mean_length'),\
       fs.stddev('sepal_length').alias('length_std'))\
  .collect()

In [None]:
new_df = df.withColumn('new_feature', df.sepal_length * df.sepal_width)

In [None]:
new_df.head(5)

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import *

In [None]:
my_udf = udf(lambda feat1, feat2: feat1 * feat2)
new_df = new_df.withColumn('new_feature2', my_udf(new_df.sepal_length, new_df.sepal_width))

In [None]:
new_df.head(5)

Site legal: https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/