### Import o findspark

In [None]:
! pip install findspark

In [1]:

import findspark
findspark.init()

In [2]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import DataFrame, Row
from pyspark.sql import functions as F
from pyspark.sql.types import *

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt # biblioteca de visualização utilizada pelo pandas e pelo seaborn
import seaborn as sns # biblioteca de visualização com mais opções de gráficos
%matplotlib inline

###  Criando uma Sessão Spark. Na sessão é possível configurar os nós do cluster, bem como a memória alocada para cada um deles, etc.

**Spark Session**: Fornece um único ponto de entrada para interagir com as funcionalidades fundamentais do Spark e permite programar o Spark com o Dataframe e outras APIs de Conjunto de Dados, como HIVE, Streaming e API Sql . Toda a funcionalidade disponível com o sparkContext também está disponível no sparkSession. Para usar APIs de SQL, HIVE e Streaming, não é necessário criar contextos separados, pois o sparkSession inclui todas as APIs.

In [4]:
spark = SparkSession.builder \
   .master("local") \
   .appName("Learn pyspark") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()

**Spark Context**: é usado como um canal para acessar as funcionalidade do spark.  O programa spark driver usa o contexto spark para conectar-se ao cluster através de  um gerenciador de recursos (YARN ouMesos ..).

In [5]:
sc = spark.sparkContext

## RDD

#### Lendo um arquivo

In [6]:
FILE_TRAIN_PATH = 'titanic_train.csv'
FILE_TEST_PATH = 'titanic_test.csv'

In [7]:
rdd = sc.textFile(FILE_TRAIN_PATH)

In [8]:
rdd.take(2) # Exibir os resultados

['PassengerId,Survived,Pclass,Name,Sex,Age,SibSp,Parch,Ticket,Fare,Cabin,Embarked',
 '1,0,3,"Braund, Mr. Owen Harris",male,22,1,0,A/5 21171,7.25,,S']

In [9]:
# Ler os dados e aplicar o split dos dados
rdd = rdd.map(lambda line: line.split(","))
rdd.take(2)

[['PassengerId',
  'Survived',
  'Pclass',
  'Name',
  'Sex',
  'Age',
  'SibSp',
  'Parch',
  'Ticket',
  'Fare',
  'Cabin',
  'Embarked'],
 ['1',
  '0',
  '3',
  '"Braund',
  ' Mr. Owen Harris"',
  'male',
  '22',
  '1',
  '0',
  'A/5 21171',
  '7.25',
  '',
  'S']]

In [10]:
# Verificar o primeiro elemento
rdd.first()

['PassengerId',
 'Survived',
 'Pclass',
 'Name',
 'Sex',
 'Age',
 'SibSp',
 'Parch',
 'Ticket',
 'Fare',
 'Cabin',
 'Embarked']

In [11]:
# Criando um rdd e definindo a estrutura da Row
rdd_to_df = rdd.map(lambda line: Row(PassengerId=line[0], Survived=line[1]))

In [12]:
df = rdd_to_df.toDF()

In [13]:
df.show(5)

+-----------+--------+
|PassengerId|Survived|
+-----------+--------+
|PassengerId|Survived|
|          1|       0|
|          2|       1|
|          3|       1|
|          4|       1|
+-----------+--------+
only showing top 5 rows



In [14]:
# Criando um rdd, aplicando filtro e definindo a estrutura da Row
rdd = sc.textFile(FILE_TRAIN_PATH) \
    .map(lambda line: line.split(",")) \
    .filter(lambda line: len(line)>1) \
    .map(lambda line: Row(PassengerId= line[0], Survived= line[1]))

In [15]:
# Removendo o header do arquivo CSV
new_rdd = rdd.subtract(sc.parallelize([rdd.first()]))

In [16]:
rdd.take(2)

[Row(PassengerId='PassengerId', Survived='Survived'),
 Row(PassengerId='1', Survived='0')]

In [17]:
new_rdd.take(2)

[Row(PassengerId='1', Survived='0'), Row(PassengerId='4', Survived='1')]

### O Famoso count words através da operação map reduce

In [18]:
counts = sc.textFile(FILE_TRAIN_PATH) \
        .flatMap(lambda line: line.split(",")) \
        .map(lambda word: (word, 1)) \
        .reduceByKey(lambda a, b: a + b)

In [19]:
counts.take(20)

[('PassengerId', 1),
 ('Survived', 1),
 ('Pclass', 1),
 ('Name', 1),
 ('Sex', 1),
 ('Age', 1),
 ('SibSp', 1),
 ('Parch', 1),
 ('Ticket', 1),
 ('Fare', 1),
 ('Cabin', 1),
 ('Embarked', 1),
 ('1', 893),
 ('0', 1850),
 ('3', 519),
 ('"Braund', 2),
 (' Mr. Owen Harris"', 1),
 ('male', 577),
 ('22', 28),
 ('A/5 21171', 1)]

## Data Frame

### Carregando o dado

In [20]:
data = spark.read.csv(FILE_TRAIN_PATH, header=True, inferSchema=True) #inferSchema=False, schema=schema
data

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [21]:
data = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(FILE_TRAIN_PATH)
data

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

#### A partir de um data frame pandas

In [22]:
schema = StructType([StructField("PassengerId", IntegerType(), True),
                     StructField("Survived", IntegerType(), True),
                     StructField("Pclass", IntegerType(), True),
                     StructField("Name", StringType(), True),
                     StructField("Sex", StringType(), True),
                     StructField("Age", DoubleType(), True),
                     StructField("SibSp", IntegerType(), True),
                     StructField("Parch", IntegerType(), True),
                     StructField("Ticket", StringType(), True),
                     StructField("Fare", DoubleType(), True),
                     StructField("Cabin", StringType(), True),
                     StructField("Embarked", StringType(), True)])

In [23]:
data = spark.read.csv(FILE_TRAIN_PATH, header=True, inferSchema=False, schema=schema) #inferSchema=False, schema=schema
data

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [24]:
df = pd.read_csv(FILE_TRAIN_PATH)
data = spark.createDataFrame(df, schema=schema)
data

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

#### A partir de um rdd

In [25]:
data_aux = spark.createDataFrame(new_rdd) #df = spark.createDataFrame(rdd, [columns])
data_aux

DataFrame[PassengerId: string, Survived: string]

In [26]:
### A partir de um arquivo parquet do HDFS ... sqlContext.read.parquet("...")

### Visualizando as dados e informações

In [27]:
data.head(2)

[Row(PassengerId=1, Survived=0, Pclass=3, Name='Braund, Mr. Owen Harris', Sex='male', Age=22.0, SibSp=1, Parch=0, Ticket='A/5 21171', Fare=7.25, Cabin='NaN', Embarked='S'),
 Row(PassengerId=2, Survived=1, Pclass=1, Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Sex='female', Age=38.0, SibSp=1, Parch=0, Ticket='PC 17599', Fare=71.2833, Cabin='C85', Embarked='C')]

In [28]:
data.show(2) # Printa as n primeira linhas no console.

+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|   Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|A/5 21171|   7.25|  NaN|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0| PC 17599|71.2833|  C85|       C|
+-----------+--------+------+--------------------+------+----+-----+-----+---------+-------+-----+--------+
only showing top 2 rows



In [29]:
data.printSchema()

root
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)
 |-- Pclass: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- SibSp: integer (nullable = true)
 |-- Parch: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: double (nullable = true)
 |-- Cabin: string (nullable = true)
 |-- Embarked: string (nullable = true)



In [30]:
#Calcula estatísticas básicas para colunas numéricas e nominais.
data.select('Survived','Pclass','Sex','Age', 'Fare').describe().show(5) 

+-------+-------------------+------------------+------+----+-----------------+
|summary|           Survived|            Pclass|   Sex| Age|             Fare|
+-------+-------------------+------------------+------+----+-----------------+
|  count|                891|               891|   891| 891|              891|
|   mean| 0.3838383838383838| 2.308641975308642|  null| NaN| 32.2042079685746|
| stddev|0.48659245426485753|0.8360712409770491|  null| NaN|49.69342859718089|
|    min|                  0|                 1|female|0.42|              0.0|
|    max|                  1|                 3|  male| NaN|         512.3292|
+-------+-------------------+------------------+------+----+-----------------+



In [31]:
#Calcula mais informações estatísticas para colunas numéricas e nominais.
data.select('Survived','Pclass','Sex','Age', 'Fare').summary().show() 

+-------+-------------------+------------------+------+----+-----------------+
|summary|           Survived|            Pclass|   Sex| Age|             Fare|
+-------+-------------------+------------------+------+----+-----------------+
|  count|                891|               891|   891| 891|              891|
|   mean| 0.3838383838383838| 2.308641975308642|  null| NaN| 32.2042079685746|
| stddev|0.48659245426485753|0.8360712409770491|  null| NaN|49.69342859718089|
|    min|                  0|                 1|female|0.42|              0.0|
|    25%|                  0|                 2|  null|22.0|           7.8958|
|    50%|                  0|                 3|  null|32.0|          14.4542|
|    75%|                  1|                 3|  null|54.0|             31.0|
|    max|                  1|                 3|  male| NaN|         512.3292|
+-------+-------------------+------------------+------+----+-----------------+



### Removendo dados

In [32]:
data.dropna()

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [33]:
data.select('Survived','Pclass','Sex','Age', 'Fare').summary().show() 

+-------+-------------------+------------------+------+----+-----------------+
|summary|           Survived|            Pclass|   Sex| Age|             Fare|
+-------+-------------------+------------------+------+----+-----------------+
|  count|                891|               891|   891| 891|              891|
|   mean| 0.3838383838383838| 2.308641975308642|  null| NaN| 32.2042079685746|
| stddev|0.48659245426485753|0.8360712409770491|  null| NaN|49.69342859718089|
|    min|                  0|                 1|female|0.42|              0.0|
|    25%|                  0|                 2|  null|22.0|           7.8958|
|    50%|                  0|                 3|  null|32.0|          14.4542|
|    75%|                  1|                 3|  null|54.0|             31.0|
|    max|                  1|                 3|  male| NaN|         512.3292|
+-------+-------------------+------------------+------+----+-----------------+



In [34]:
data = data.dropna()

In [35]:
data.select('Survived','Pclass','Sex','Age', 'Fare').summary().show() 

+-------+------------------+------------------+------+------------------+-----------------+
|summary|          Survived|            Pclass|   Sex|               Age|             Fare|
+-------+------------------+------------------+------+------------------+-----------------+
|  count|               714|               714|   714|               714|              714|
|   mean|0.4061624649859944|2.2366946778711485|  null| 29.69911764705882|34.69451400560218|
| stddev|0.4914598643353704| 0.838249862698379|  null|14.526497332334035|52.91892950254356|
|    min|                 0|                 1|female|              0.42|              0.0|
|    25%|                 0|                 1|  null|              20.0|             8.05|
|    50%|                 0|                 2|  null|              28.0|          15.7417|
|    75%|                 1|                 3|  null|              38.0|             33.5|
|    max|                 1|                 3|  male|              80.0|       

In [36]:
data.select('Age','Sex').dropDuplicates().show(10)

+----+------+
| Age|   Sex|
+----+------+
|49.0|female|
|55.0|female|
|47.0|  male|
|0.83|  male|
|12.0|  male|
|66.0|  male|
|64.0|  male|
|18.0|female|
|36.0|  male|
|44.0|  male|
+----+------+
only showing top 10 rows



In [37]:
### Realizando input de todos os valores nulos pelo valor passado como parametro
data.fillna(100)

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [38]:
### Realizando input de dados apenas para as colunas especificadas
data.na.fill({'Age': 29, 'Name': 'Unknown'}).show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25|  NaN|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925|  NaN|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05|  NaN|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

### Realizando consultas

In [39]:
data.select('Name').show(5)

+--------------------+
|                Name|
+--------------------+
|Braund, Mr. Owen ...|
|Cumings, Mrs. Joh...|
|Heikkinen, Miss. ...|
|Futrelle, Mrs. Ja...|
|Allen, Mr. Willia...|
+--------------------+
only showing top 5 rows



In [40]:
data.select('Name','Age').show(5)

+--------------------+----+
|                Name| Age|
+--------------------+----+
|Braund, Mr. Owen ...|22.0|
|Cumings, Mrs. Joh...|38.0|
|Heikkinen, Miss. ...|26.0|
|Futrelle, Mrs. Ja...|35.0|
|Allen, Mr. Willia...|35.0|
+--------------------+----+
only showing top 5 rows



In [41]:
data.select('Name','Age').head(5)

[Row(Name='Braund, Mr. Owen Harris', Age=22.0),
 Row(Name='Cumings, Mrs. John Bradley (Florence Briggs Thayer)', Age=38.0),
 Row(Name='Heikkinen, Miss. Laina', Age=26.0),
 Row(Name='Futrelle, Mrs. Jacques Heath (Lily May Peel)', Age=35.0),
 Row(Name='Allen, Mr. William Henry', Age=35.0)]

In [42]:
data.selectExpr('(Age * 2) as New_Age').show(5)

+-------+
|New_Age|
+-------+
|   44.0|
|   76.0|
|   52.0|
|   70.0|
|   70.0|
+-------+
only showing top 5 rows



In [43]:
data.select('Age').distinct().count()

88

In [44]:
data.filter(data.Age < 18).show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|          8|       0|     3|Palsson, Master. ...|  male| 2.0|    3|    1| 349909| 21.075|  NaN|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0| 237736|30.0708|  NaN|       C|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|PP 9549|   16.7|   G6|       S|
|         15|       0|     3|Vestrom, Miss. Hu...|female|14.0|    0|    0| 350406| 7.8542|  NaN|       S|
|         17|       0|     3|Rice, Master. Eugene|  male| 2.0|    4|    1| 382652| 29.125|  NaN|       Q|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows



In [54]:
data.filter('Age < 18').show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|          8|       0|     3|Palsson, Master. ...|  male| 2.0|    3|    1| 349909| 21.075|  NaN|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0| 237736|30.0708|  NaN|       C|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|PP 9549|   16.7|   G6|       S|
|         15|       0|     3|Vestrom, Miss. Hu...|female|14.0|    0|    0| 350406| 7.8542|  NaN|       S|
|         17|       0|     3|Rice, Master. Eugene|  male| 2.0|    4|    1| 382652| 29.125|  NaN|       Q|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows



In [55]:
data.where('Age < 18').show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch| Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
|          8|       0|     3|Palsson, Master. ...|  male| 2.0|    3|    1| 349909| 21.075|  NaN|       S|
|         10|       1|     2|Nasser, Mrs. Nich...|female|14.0|    1|    0| 237736|30.0708|  NaN|       C|
|         11|       1|     3|Sandstrom, Miss. ...|female| 4.0|    1|    1|PP 9549|   16.7|   G6|       S|
|         15|       0|     3|Vestrom, Miss. Hu...|female|14.0|    0|    0| 350406| 7.8542|  NaN|       S|
|         17|       0|     3|Rice, Master. Eugene|  male| 2.0|    4|    1| 382652| 29.125|  NaN|       Q|
+-----------+--------+------+--------------------+------+----+-----+-----+-------+-------+-----+--------+
only showing top 5 rows



In [56]:
data.orderBy(data.Age.desc()).show(5) # Listar primeiro os mais idosos

+-----------+--------+------+--------------------+----+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+--------+-------+-----+--------+
|        631|       1|     1|Barkworth, Mr. Al...|male|80.0|    0|    0|   27042|   30.0|  A23|       S|
|        852|       0|     3| Svensson, Mr. Johan|male|74.0|    0|    0|  347060|  7.775|  NaN|       S|
|        494|       0|     1|Artagaveytia, Mr....|male|71.0|    0|    0|PC 17609|49.5042|  NaN|       C|
|         97|       0|     1|Goldschmidt, Mr. ...|male|71.0|    0|    0|PC 17754|34.6542|   A5|       C|
|        117|       0|     3|Connors, Mr. Patrick|male|70.5|    0|    0|  370369|   7.75|  NaN|       Q|
+-----------+--------+------+--------------------+----+----+-----+-----+--------+-------+-----+--------+
only showing top 5 rows



In [57]:
data.sort('Age', ascending=False).show(5) # Listar primeiro os mais idosos

+-----------+--------+------+--------------------+----+----+-----+-----+--------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name| Sex| Age|SibSp|Parch|  Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+----+----+-----+-----+--------+-------+-----+--------+
|        631|       1|     1|Barkworth, Mr. Al...|male|80.0|    0|    0|   27042|   30.0|  A23|       S|
|        852|       0|     3| Svensson, Mr. Johan|male|74.0|    0|    0|  347060|  7.775|  NaN|       S|
|        494|       0|     1|Artagaveytia, Mr....|male|71.0|    0|    0|PC 17609|49.5042|  NaN|       C|
|         97|       0|     1|Goldschmidt, Mr. ...|male|71.0|    0|    0|PC 17754|34.6542|   A5|       C|
|        117|       0|     3|Connors, Mr. Patrick|male|70.5|    0|    0|  370369|   7.75|  NaN|       Q|
+-----------+--------+------+--------------------+----+----+-----+-----+--------+-------+-----+--------+
only showing top 5 rows



In [58]:
data_subtract = data.select('PassengerId') \
                .subtract(data.filter(data.PassengerId == 1).select('PassengerId'))

In [59]:
data_subtract.filter('PassengerId in (1,2)').show()

+-----------+
|PassengerId|
+-----------+
|          2|
+-----------+



In [60]:
data.select(data.PassengerId, data.Fare,
          F.when(data.Fare <= 50, 'cheaper') \
          .otherwise('expensive').alias('new_category')
           ).show(5)

+-----------+-------+------------+
|PassengerId|   Fare|new_category|
+-----------+-------+------------+
|          1|   7.25|     cheaper|
|          2|71.2833|   expensive|
|          3|  7.925|     cheaper|
|          4|   53.1|   expensive|
|          5|   8.05|     cheaper|
+-----------+-------+------------+
only showing top 5 rows



In [61]:
data.select(data.PassengerId, data.Fare,
          F.when(data.Fare <= 50, 'cheaper') \
            .when((data.Fare > 50) & (data.Fare < 100), 'cheaper_2') \
            .otherwise('expensive').alias('new_category')).show(5)

+-----------+-------+------------+
|PassengerId|   Fare|new_category|
+-----------+-------+------------+
|          1|   7.25|     cheaper|
|          2|71.2833|   cheaper_2|
|          3|  7.925|     cheaper|
|          4|   53.1|   cheaper_2|
|          5|   8.05|     cheaper|
+-----------+-------+------------+
only showing top 5 rows



#### Realizando join

In [62]:
fake_data = spark.read.csv('titanic_train_fake.csv', header=True, inferSchema=True) #inferSchema=False, schema=schema
fake_data

DataFrame[PassengerId: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [63]:
data.join(fake_data, data.PassengerId == fake_data.PassengerId)\
    .select(fake_data.PassengerId.alias('PI'), data.PassengerId.alias('DataPassengerId'), 'Survived', 'Other_Info').show(5)

AnalysisException: "cannot resolve '`Other_Info`' given input columns: [Pclass, Survived, Name, Parch, PassengerId, Parch, Fare, Sex, Name, Ticket, Ticket, Cabin, SibSp, Age, SibSp, Embarked, Age, PassengerId, Pclass, Sex, Embarked, Cabin, Fare];;\n'Project [PassengerId#2247 AS PI#2338, PassengerId#104 AS DataPassengerId#2339, Survived#105, 'Other_Info]\n+- AnalysisBarrier\n      +- Join Inner, (PassengerId#104 = PassengerId#2247)\n         :- Filter AtLeastNNulls(n, PassengerId#104,Survived#105,Pclass#106,Name#107,Sex#108,Age#109,SibSp#110,Parch#111,Ticket#112,Fare#113,Cabin#114,Embarked#115)\n         :  +- LogicalRDD [PassengerId#104, Survived#105, Pclass#106, Name#107, Sex#108, Age#109, SibSp#110, Parch#111, Ticket#112, Fare#113, Cabin#114, Embarked#115], false\n         +- Relation[PassengerId#2247,Pclass#2248,Name#2249,Sex#2250,Age#2251,SibSp#2252,Parch#2253,Ticket#2254,Fare#2255,Cabin#2256,Embarked#2257] csv\n"

In [64]:
fake_data = fake_data.alias('dado_fake')
data = data.alias('data')
data.join(fake_data, F.col("dado_fake.PassengerId") == F.col("data.PassengerId"), 'inner').show(5)

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|PassengerId|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----------+------+----+---+---+-----+-----+------+----+-----+--------+
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+-----------+------+----+---+---+-----+-----+------+----+-----+--------+



### Agrupar dados

In [None]:
data.groupBy("Pclass") \
.agg(F.sum('Fare').alias('soma'), F.mean('Fare').alias('media'),  F.count("*")) \
.show(5) ### Agrupar os dados por um determinado pivô.

In [None]:
data.groupBy("Pclass") \
    .agg(F.sum('Fare').alias("soma")) \
    .sort('soma', descending=True).show(5) 

In [None]:
# We can use crosstab operation on DataFrame to calculate the pair wise frequency of columns. 
data.crosstab('Pclass', 'Sex').show()

### Criando uma nova coluna novos dados

In [None]:
data.withColumn('New_Fare', (data['Fare'] + 1.5).cast(FloatType())).show(5)

#### Através de udfs

In [None]:
def is_cheaper(fare, base_fare):
    return fare>base_fare

In [None]:
data.withColumn('Is_Cheaper', is_cheaper(data['Fare'], 50).cast(BooleanType())).show(5)

In [None]:
is_cheaper_udf = F.udf(lambda fare, base_fare: fare>base_fare, BooleanType())

In [None]:
data.withColumn('Is_Cheaper', is_cheaper_udf(data['Fare'], F.lit(50))).show(5)

#### Visualizar colunas

In [None]:
data.columns

#### Removendo colunas

In [None]:
data.drop('SibSp', 'Ticket').show(5)

In [None]:
data.drop('SibSp').columns

#### Renomeando colunas

In [None]:
data.withColumnRenamed('Sex', 'Gender').show(5)

### Mais operações

In [None]:
data.rdd

In [None]:
data.select('Age', 'Sex').rdd.map(lambda x:(x[0], x[1], 1)).take(5)

### SQL Queries on DataFrame

In [None]:
TEMP_TABLE = 'train_temp_table'

In [None]:
data.createOrReplaceTempView(TEMP_TABLE) ## Temp table createOrReplaceTempView

In [None]:
spark.sql('select * from {temp_table}'.format(temp_table=TEMP_TABLE)).show(5)

In [None]:
spark.sql('select Age, max(Fare) from {temp_table} group by Age order by Age desc'.format(temp_table=TEMP_TABLE)).show(5)

### Converter para dataFrame pandas

In [None]:
dataPandasDF = data.toPandas()

In [None]:
type(dataPandasDF)

In [None]:
dataPandasDF.head(5)

### Finalizando o Spark Job

In [None]:
spark.stop()

### Referências

- https://www.cetax.com.br/blog/tutorial-pyspark-e-mllib/
- http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html
- https://www.analyticsvidhya.com/blog/2016/10/spark-dataframe-and-operations/
- http://spark.apache.org/docs/2.1.0/api/python/pyspark.sql.html#pyspark.sql.DataFrame

<!--
# Import 'DenseVector'
from pyspark.ml.linalg import DenseVector

##### Define the 'input_data'
input_data = data.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

#### Import 'DenseVector'
from pyspark.ml.linalg import DenseVector

#### Define the 'input_data'
input_data = data.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

#### Replace 'df' with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

#=========================================================================================================
#### Import "StandardScaler"
from pyspark.ml.feature import StandardScaler

#### Initialize the "standardScaler"
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

#### Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

#### Transform the data in 'df' with the scaler
scaled_df = scaler.transform(df)

#### Inspect the result
scaled_df.show(2)

#=========================================================================================================

from pyspark.ml.feature import PCA

pca = PCA(k=3, inputCol="features_scaled", outputCol="pcaFeatures")
model = pca.fit(scaled_df)

result = model.transform(scaled_df).select("pcaFeatures")
result.show(truncate=False) Import 'DenseVector'
from pyspark.ml.linalg import DenseVector

#### Define the 'input_data'
input_data = data.rdd.map(lambda x: (x[0], DenseVector(x[1:])))

#### Replace 'df' with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

#=========================================================================================================
#### Import "StandardScaler"
from pyspark.ml.feature import StandardScaler

#### Initialize the "standardScaler"
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

#### Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

#### Transform the data in 'df' with the scaler
scaled_df = scaler.transform(df)

#### Inspect the result
scaled_df.show(2)

#=========================================================================================================

from pyspark.ml.feature import PCA

pca = PCA(k=3, inputCol="features_scaled", outputCol="pcaFeatures")
model = pca.fit(scaled_df)

result = model.transform(scaled_df).select("pcaFeatures")
result.show(truncate=False) Replace 'df' with the new DataFrame
df = spark.createDataFrame(input_data, ["label", "features"])

#=========================================================================================================
#### Import "StandardScaler"
from pyspark.ml.feature import StandardScaler

#### Initialize the "standardScaler"
standardScaler = StandardScaler(inputCol="features", outputCol="features_scaled")

#### Fit the DataFrame to the scaler
scaler = standardScaler.fit(df)

#### Transform the data in 'df' with the scaler
scaled_df = scaler.transform(df)

#### Inspect the result
scaled_df.show(2)

#=========================================================================================================

from pyspark.ml.feature import PCA

pca = PCA(k=3, inputCol="features_scaled", outputCol="pcaFeatures")
model = pca.fit(scaled_df)

result = model.transform(scaled_df).select("pcaFeatures")
result.show(truncate=False)

-->