### Iniciando sessão spark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("DemoSpark").getOrCreate()
spark

### Leitura do arquivo csv

In [58]:
path_customer = 'data/customer.csv'
# df_customer = spark.read.csv(path = path_customer, header = True, inferSchema = True)
# df_customer = spark.read.option('delimeter', ',').option('header', True).csv(path = path_customer)
df_customer = spark.read.options(delimeter = ',', header = True).csv(path = path_customer, inferSchema = True)

df_customer.show(5)

+-----------+----------+----------+----------+---------+------+-----+---------+-------------+------------+-----------------+--------------+-----+-----------------+-------+-------+-----------+----------+---+--------------------+-------------------+--------------------+----------+----------+
|CustomerKey|GeoAreaKey|   StartDT|     EndDT|Continent|Gender|Title|GivenName|MiddleInitial|     Surname|    StreetAddress|          City|State|        StateFull|ZipCode|Country|CountryFull|  Birthday|Age|          Occupation|            Company|             Vehicle|  Latitude| Longitude|
+-----------+----------+----------+----------+---------+------+-----+---------+-------------+------------+-----------------+--------------+-----+-----------------+-------+-------+-----------+----------+---+--------------------+-------------------+--------------------+----------+----------+
|         15|         4|1990-09-10|2034-07-29|Australia|  male|  Mr.|   Julian|            A|    McGuigan|25 Railway Street| Ca

In [None]:
# Mostrar as columas de um dataframe
df_customer.columns

# Selecionando colunas no do dataframe
customer_columns = ['CustomerKey', 'Continent', 'Gender', 'GivenName', 'MiddleInitial', 'Surname', 'City', 'StateFull', 'CountryFull', 'Age']
df_customer.select(customer_columns).show()




+-------+------------------+-------------+------+---------+-------------+-------+------+-------------+-------------+------------------+
|summary|       CustomerKey|    Continent|Gender|GivenName|MiddleInitial|Surname|  City|    StateFull|  CountryFull|               Age|
+-------+------------------+-------------+------+---------+-------------+-------+------+-------------+-------------+------------------+
|  count|            104990|       104990|104990|   104990|       104990| 104990|104990|       104990|       104990|            104990|
|   mean|1049779.4654538527|         NULL|  NULL|      NaN|          0.0|    NaN|  NULL|         NULL|         NULL|51.896332984093725|
| stddev| 607914.0568468425|         NULL|  NULL|      NaN|         NULL|    NaN|  NULL|         NULL|         NULL|19.359899289118264|
|    min|                15|    Australia|female|        0|            0|'t Hoen|  Aach|     Aberdeen|    Australia|                19|
|    max|           2099743|North America|  male

### Mostrar tipos de dados de cada coluna

In [36]:
df_customer.select(customer_columns).dtypes
# df_customer.select(customer_columns).describe().show()
# df_customer.printSchema()

[('CustomerKey', 'int'),
 ('Continent', 'string'),
 ('Gender', 'string'),
 ('GivenName', 'string'),
 ('MiddleInitial', 'string'),
 ('Surname', 'string'),
 ('City', 'string'),
 ('StateFull', 'string'),
 ('CountryFull', 'string'),
 ('Age', 'int')]

### Adicionar e remover colunas

In [50]:
df_customer = df_customer.withColumn('Age Added by 5', df_customer['Age'] + 5)
df_customer.select('Age Added by 5').show(5)

+--------------+
|Age Added by 5|
+--------------+
|            60|
|            35|
|            61|
|            79|
|            70|
+--------------+
only showing top 5 rows


In [51]:
df_customer = df_customer.drop('Age Added by 5')
df_customer.show(5)

+-----------+----------+----------+----------+---------+------+-----+---------+-------------+------------+-----------------+--------------+-----+-----------------+-------+-------+-----------+----------+---+--------------------+-------------------+--------------------+----------+----------+
|CustomerKey|GeoAreaKey|   StartDT|     EndDT|Continent|Gender|Title|GivenName|MiddleInitial|     Surname|    StreetAddress|          City|State|        StateFull|ZipCode|Country|CountryFull|  Birthday|Age|          Occupation|            Company|             Vehicle|  Latitude| Longitude|
+-----------+----------+----------+----------+---------+------+-----+---------+-------------+------------+-----------------+--------------+-----+-----------------+-------+-------+-----------+----------+---+--------------------+-------------------+--------------------+----------+----------+
|         15|         4|1990-09-10|2034-07-29|Australia|  male|  Mr.|   Julian|            A|    McGuigan|25 Railway Street| Ca

### Renomear a Coluna

In [55]:
# df_customer.withColumnRenamed('StartDT', 'StartDate').show()
df_customer.withColumnsRenamed({'StartDT': 'StartDate', 'EndDT': 'EndDate'}).show(5)

+-----------+----------+----------+----------+---------+------+-----+---------+-------------+------------+-----------------+--------------+-----+-----------------+-------+-------+-----------+----------+---+--------------------+-------------------+--------------------+----------+----------+
|CustomerKey|GeoAreaKey| StartDate|   EndDate|Continent|Gender|Title|GivenName|MiddleInitial|     Surname|    StreetAddress|          City|State|        StateFull|ZipCode|Country|CountryFull|  Birthday|Age|          Occupation|            Company|             Vehicle|  Latitude| Longitude|
+-----------+----------+----------+----------+---------+------+-----+---------+-------------+------------+-----------------+--------------+-----+-----------------+-------+-------+-----------+----------+---+--------------------+-------------------+--------------------+----------+----------+
|         15|         4|1990-09-10|2034-07-29|Australia|  male|  Mr.|   Julian|            A|    McGuigan|25 Railway Street| Ca

### Mostrar linhas nulas

In [66]:
from pyspark.sql.functions import col
from functools import reduce

condition = reduce(lambda a, b: a | b, [col(c).isNull() for c in df_customer.columns])
null_rows_df = df_customer.filter(condition)
null_rows_df.show()

+-----------+----------+----------+----------+-------------+------+-----+---------+-------------+-----------------+--------------------+-----------------+-----+------------------+-------+-------+-----------+----------+---+--------------------+-------+--------------------+----------+----------+
|CustomerKey|GeoAreaKey|   StartDT|     EndDT|    Continent|Gender|Title|GivenName|MiddleInitial|          Surname|       StreetAddress|             City|State|         StateFull|ZipCode|Country|CountryFull|  Birthday|Age|          Occupation|Company|             Vehicle|  Latitude| Longitude|
+-----------+----------+----------+----------+-------------+------+-----+---------+-------------+-----------------+--------------------+-----------------+-----+------------------+-------+-------+-----------+----------+---+--------------------+-------+--------------------+----------+----------+
|       5720|         2|1981-06-26|2018-11-27|    Australia|  male|  Mr.|   Bailey|            G|          Bracker|

### Remover linhas com qualquer valor nulo

In [72]:
# Remove toda a linha que qualquer (any) coluna estiver nula, no mínimo duas colunas precisam estar não nulas
df_customer.na.drop(
    how = 'any',
    #thresh = 2,
    subset = ['Age'] # Apenas se a coluna age tiver nula
    ).show()

+-----------+----------+----------+----------+---------+------+-----+---------+-------------+------------+--------------------+--------------+-----+-----------------+-------+-------+-----------+----------+---+--------------------+--------------------+--------------------+----------+----------+
|CustomerKey|GeoAreaKey|   StartDT|     EndDT|Continent|Gender|Title|GivenName|MiddleInitial|     Surname|       StreetAddress|          City|State|        StateFull|ZipCode|Country|CountryFull|  Birthday|Age|          Occupation|             Company|             Vehicle|  Latitude| Longitude|
+-----------+----------+----------+----------+---------+------+-----+---------+-------------+------------+--------------------+--------------+-----+-----------------+-------+-------+-----------+----------+---+--------------------+--------------------+--------------------+----------+----------+
|         15|         4|1990-09-10|2034-07-29|Australia|  male|  Mr.|   Julian|            A|    McGuigan|   25 Rai

### Preencher valores nulos

In [80]:
df_customer.na.fill('Não Informado', subset = ['Company', 'Vehicle']).filter(df_customer['Company'].isNull()).show()

+-----------+----------+----------+----------+-------------+------+-----+---------+-------------+-----------------+--------------------+-----------------+-----+------------------+-------+-------+-----------+----------+---+--------------------+-------------+--------------------+----------+----------+
|CustomerKey|GeoAreaKey|   StartDT|     EndDT|    Continent|Gender|Title|GivenName|MiddleInitial|          Surname|       StreetAddress|             City|State|         StateFull|ZipCode|Country|CountryFull|  Birthday|Age|          Occupation|      Company|             Vehicle|  Latitude| Longitude|
+-----------+----------+----------+----------+-------------+------+-----+---------+-------------+-----------------+--------------------+-----------------+-----+------------------+-------+-------+-----------+----------+---+--------------------+-------------+--------------------+----------+----------+
|       5720|         2|1981-06-26|2018-11-27|    Australia|  male|  Mr.|   Bailey|            G|

### Filtrando valores

In [89]:
df_customer.select(['CustomerKey', 'GivenName', 'Age']).filter('Age > 50').show(5)

+-----------+---------+---+
|CustomerKey|GivenName|Age|
+-----------+---------+---+
|         15|   Julian| 55|
|         36|Annabelle| 56|
|        120|    Jamie| 74|
|        180|  Gabriel| 65|
|        189|   Hayley| 60|
+-----------+---------+---+
only showing top 5 rows


In [92]:
df_customer.select(['CustomerKey', 'GivenName', 'Age']).filter(~(df_customer['Age'] > 50) & (df_customer['GivenName'] == 'Julian')).show(5)

+-----------+---------+---+
|CustomerKey|GivenName|Age|
+-----------+---------+---+
|       3429|   Julian| 20|
|      13975|   Julian| 34|
|      30989|   Julian| 45|
|      37298|   Julian| 20|
|      43278|   Julian| 33|
+-----------+---------+---+
only showing top 5 rows


### Agrupamento e agragações

In [93]:
# from pyspark.sql import SparkSession
# spark = SparkSession.builder.appName('Demo').getOrCreate()

In [117]:
path_orderrows = 'data/orderrows.csv'
df_orderrows = spark.read.csv(path_orderrows, header = True, inferSchema = True)
df_orderrows.show(5)

+--------+----------+----------+--------+---------+--------+--------+
|OrderKey|LineNumber|ProductKey|Quantity|UnitPrice|NetPrice|UnitCost|
+--------+----------+----------+--------+---------+--------+--------+
|  139000|         0|       153|       2|  375.976| 375.976| 172.896|
|  139000|         1|      1621|       8|    7.794|   7.794|   3.972|
|  139001|         0|       279|       3|    239.2|  227.24| 121.952|
|  139001|         1|      1806|       1|     22.4|    22.4|  11.417|
|  139002|         0|       125|       2|   114.72| 103.248|  58.488|
+--------+----------+----------+--------+---------+--------+--------+
only showing top 5 rows


In [118]:
# Adicionando coluna de TotalCost e TotalNetPrice
df_orderrows = df_orderrows.withColumns(
    {'TotalCost': df_orderrows['Quantity'] * df_orderrows['UnitCost'],
     'TotalNetPrice': df_orderrows['Quantity'] * df_orderrows['NetPrice']}
)

df_orderrows = df_orderrows.withColumn('TotalProfit', df_orderrows['TotalNetPrice'] - df_orderrows['TotalCost'])
df_orderrows.show()

+--------+----------+----------+--------+---------+--------+--------+------------------+-----------------+------------------+
|OrderKey|LineNumber|ProductKey|Quantity|UnitPrice|NetPrice|UnitCost|         TotalCost|    TotalNetPrice|       TotalProfit|
+--------+----------+----------+--------+---------+--------+--------+------------------+-----------------+------------------+
|  139000|         0|       153|       2|  375.976| 375.976| 172.896|           345.792|          751.952|            406.16|
|  139000|         1|      1621|       8|    7.794|   7.794|   3.972|            31.776|           62.352|30.575999999999997|
|  139001|         0|       279|       3|    239.2|  227.24| 121.952|           365.856|           681.72|315.86400000000003|
|  139001|         1|      1806|       1|     22.4|    22.4|  11.417|            11.417|             22.4|10.982999999999999|
|  139002|         0|       125|       2|   114.72| 103.248|  58.488|           116.976|          206.496| 89.52000000

In [123]:
df_orderrows.groupBy(['OrderKey', 'ProductKey']).sum('TotalProfit').orderBy(3, ascending = False).show()

+--------+----------+------------------+
|OrderKey|ProductKey|  sum(TotalProfit)|
+--------+----------+------------------+
| 1477041|       600|           33420.6|
| 1377023|       540|           33420.6|
| 1272006|       564|           33420.6|
|  410008|       575|           30692.4|
| 1241025|       551|           30692.4|
| 1395030|       564|29422.199999999997|
| 1234036|       588|           28422.6|
|  677001|       575|          28332.45|
| 1741006|       633|          27623.16|
|  775007|       633|          26855.85|
| 1166002|       588|          26736.48|
| 1510020|       540|          26736.48|
| 1456003|       623|25580.339999999997|
| 1127017|       551|24731.459999999995|
| 1197005|       551|          24553.92|
| 1001016|       611|          24553.92|
| 1391017|       575|          24553.92|
|  658000|       633|           24446.1|
| 2318020|       588|24315.750000000004|
|  524003|       551|           24266.4|
+--------+----------+------------------+
only showing top