# Python - ETL com pyspark
- mongoDB Atlas: https://cloud.mongodb.com/v2#

## Import das libs

In [None]:
from pyspark.sql.session import SparkSession
from pyspark.sql.types import (BooleanType, IntegerType, StringType, 
                               TimestampType, StructType,
                               StructField, ArrayType,
                               TimestampType,FloatType)
import pyspark.sql.functions as F

## Criando a sessão spark

In [None]:
spark = SparkSession.builder.appName('ETL_example')\
        .config('spark.master', 'local')\
        .config('spark.executor.memory', '2gb')\
        .config('spark.shuffle.sql.partitions', 2)\
        .getOrCreate()

## Extract

In [None]:
schema = StructType([
    StructField('target', StringType()),
    StructField('_id', IntegerType()),
    StructField('date', StringType()),
    StructField('flag', StringType()),
    StructField('user', StringType()),
    StructField('text', StringType())
])

path = '/content/training.1600000.processed.noemoticon.csv'

### Criando o DataFrame

In [None]:
df = spark.read.format('csv')\
     .schema(schema)\
     .load(path)

In [None]:
df.printSchema()
df.show(5)

root
 |-- target: string (nullable = true)
 |-- _id: integer (nullable = true)
 |-- date: string (nullable = true)
 |-- flag: string (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)

+------+----------+--------------------+--------+---------------+--------------------+
|target|       _id|                date|    flag|           user|                text|
+------+----------+--------------------+--------+---------------+--------------------+
|     0|1467810369|Mon Apr 06 22:19:...|NO_QUERY|_TheSpecialOne_|@switchfoot http:...|
|     0|1467810672|Mon Apr 06 22:19:...|NO_QUERY|  scotthamilton|is upset that he ...|
|     0|1467810917|Mon Apr 06 22:19:...|NO_QUERY|       mattycus|@Kenichan I dived...|
|     0|1467811184|Mon Apr 06 22:19:...|NO_QUERY|        ElleCTF|my whole body fee...|
|     0|1467811193|Mon Apr 06 22:19:...|NO_QUERY|         Karoli|@nationwideclass ...|
+------+----------+--------------------+--------+---------------+---------------

## Transform

In [None]:
# Deletando as colunas 'target' e 'flag'
df = df.drop('target', 'flag')
df.show(3)

+----------+--------------------+---------------+--------------------+
|       _id|                date|           user|                text|
+----------+--------------------+---------------+--------------------+
|1467810369|Mon Apr 06 22:19:...|_TheSpecialOne_|@switchfoot http:...|
|1467810672|Mon Apr 06 22:19:...|  scotthamilton|is upset that he ...|
|1467810917|Mon Apr 06 22:19:...|       mattycus|@Kenichan I dived...|
+----------+--------------------+---------------+--------------------+
only showing top 3 rows



### Separando a coluna date

In [None]:
df = df.withColumn('day_week', df.date.substr(1, 3))\
       .withColumn('day', df.date.substr(9, 2))\
       .withColumn('month', df.date.substr(5, 3))\
       .withColumn('time', df.date.substr(12, 8))\
       .withColumn('year', df.date.substr(25, 4))\
       .drop('date')

In [None]:
df.show(5)

+----------+---------------+--------------------+--------+---+-----+--------+----+
|       _id|           user|                text|day_week|day|month|    time|year|
+----------+---------------+--------------------+--------+---+-----+--------+----+
|1467810369|_TheSpecialOne_|@switchfoot http:...|     Mon| 06|  Apr|22:19:45|2009|
|1467810672|  scotthamilton|is upset that he ...|     Mon| 06|  Apr|22:19:49|2009|
|1467810917|       mattycus|@Kenichan I dived...|     Mon| 06|  Apr|22:19:53|2009|
|1467811184|        ElleCTF|my whole body fee...|     Mon| 06|  Apr|22:19:57|2009|
|1467811193|         Karoli|@nationwideclass ...|     Mon| 06|  Apr|22:19:57|2009|
+----------+---------------+--------------------+--------+---+-----+--------+----+
only showing top 5 rows



### Convertendo Tipos de Dados

In [None]:
def converter_coluna(dataframe, nomes, novoTipo):
    for nome in nomes: 
        dataframe = dataframe.withColumn(nome, dataframe[nome].cast(novoTipo))
    return dataframe 

coluna_string = ['day_week', 'month']
coluna_inteiro = ['day']
coluna_time = ['time']

df = converter_coluna(df, coluna_string, StringType())
df = converter_coluna(df, coluna_inteiro, IntegerType())
df = converter_coluna(df, coluna_time, TimestampType())

df.printSchema()

root
 |-- _id: integer (nullable = true)
 |-- user: string (nullable = true)
 |-- text: string (nullable = true)
 |-- day_week: string (nullable = true)
 |-- day: integer (nullable = true)
 |-- month: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- year: string (nullable = true)



## Load

### Conexão com o mongoDB

In [None]:
def get_database():
    from pymongo import MongoClient

    CONNECTION_STRING = ''
    client = MongoClient(CONNECTION_STRING)

    return client['etl_soul_on']

In [None]:
dbname = get_database() # conecta no banco
collection_name = dbname['data_load'] # cria uma colecao 'chamada data_load'

df = df.limit(20) # insere apenas 20 linhas
df = df.toPandas() # converte o df do pyspark para df do pandas
data_dict = df.to_dict('records') # converte o df em um dicionario (json)
collection_name.insert_many(data_dict) # insere o df na collection 
print('Data Frame importado com sucesso!')


  series = series.astype(t, copy=False)


Data Frame importado com sucesso!
