# Desafio Técnico para Data Engineer

### Inicializando PySpark e Bilbiotecas necessárias

In [1]:
import findspark
findspark.init()

In [2]:
import pyspark
import pandas as pd 
import os
from pyspark.sql import SparkSession
from pyspark.sql import Column
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import col, isnan, when, count
spark = SparkSession.builder.getOrCreate()

In [3]:
### Criando uma pasta para receber os arquivos
os.makedirs('c:/arquivos', exist_ok=True)

### ETL, Lendo e Tratando o Arquivo csv - Netflix

In [4]:
# Extraindo o Arquivo Netflix e definindo o diretório de trabalho
from urllib import request
file_url = 'https://storage.googleapis.com/kaggle-data-sets/1636/792972/bundle/archive.zip?X-Goog-Algorithm=GOOG4-RSA-SHA256&X-Goog-Credential=gcp-kaggle-com%40kaggle-161607.iam.gserviceaccount.com%2F20220612%2Fauto%2Fstorage%2Fgoog4_request&X-Goog-Date=20220612T221602Z&X-Goog-Expires=259199&X-Goog-SignedHeaders=host&X-Goog-Signature=4dc002d5fe837939d480cbae4cc8d117451a40ad8e82641ef19b2879f943fece9d5af495179162ca7b8ff8be258ffce380698ab196300d0e2e664bd57900b0b35f46851ec9eab63a5dcbff48e6efb79cdc9a5a27537b77228314ed1a3b3505713830c80df5c9c39e62b566a9be3ae270c3eee96ccd2c0a9c5f1f87d14405fcaff67e2a494d572cb59c8d987a6091f3c10e90ec3668cfeb6fa78560f1b80f75dce84239ebd1fe827081d043e493bd37a343c0319180559c7e1a520cfbfade4ef53b2bb917553c7f292d6fe38cb1e81345efee33b6769bf2b1fffbbb9b25d8f1cbd472a1da2153fcabd521e517bcbcac439bec9018759e0c8aa1ec1f066ba3c2a4'
file = 'c:/arquivos/archive.zip'
request.urlretrieve(file_url, file)

('c:/arquivos/archive.zip', <http.client.HTTPMessage at 0x2a3860e1820>)

In [5]:
# Descompactando arquivo
from zipfile import ZipFile
z = ZipFile('c:/arquivos/archive.zip', 'r')
z.extractall('c:/arquivos/')
z.close()

In [6]:
# Definindo o nome das colunas
path = 'c:/arquivos/movie_titles.csv'
colunas = ['ano', 'filme']
netflix = pd.read_csv(path, header=None, index_col = 0, 
                      on_bad_lines='skip', names = colunas,
                      encoding='ANSI', sep=',')

In [7]:
# Remover Duplicados
netflix = netflix.drop_duplicates()

In [8]:
# Valores NaN na coluna ano
valores_ausentes_netflix = pd.isnull(netflix['ano'])
netflix[valores_ausentes_netflix] 

Unnamed: 0,ano,filme
4388,,Ancient Civilizations: Rome and Pompeii
4794,,Ancient Civilizations: Land of the Pharaohs
7241,,Ancient Civilizations: Athens and Greece
10782,,Roti Kapada Aur Makaan
15918,,Hote Hote Pyaar Ho Gaya
16678,,Jimmy Hollywood
17667,,Eros Dance Dhamaka


In [9]:
# Valores NaN na coluna filme
valores_ausentes_netflix = pd.isnull(netflix['filme'])
netflix[valores_ausentes_netflix] 

Unnamed: 0,ano,filme


In [10]:
netflix.info() #Identificada na coluna ano valores ausentes ou nulos

<class 'pandas.core.frame.DataFrame'>
Int64Index: 17754 entries, 1 to 17770
Data columns (total 2 columns):
 #   Column  Non-Null Count  Dtype  
---  ------  --------------  -----  
 0   ano     17747 non-null  float64
 1   filme   17754 non-null  object 
dtypes: float64(1), object(1)
memory usage: 416.1+ KB


In [11]:
# Convertendo e visualizando DF pandas para PySpark
df_netflix = spark.createDataFrame(netflix)
df_netflix.show()
df_netflix.printSchema()

+------+--------------------+
|   ano|               filme|
+------+--------------------+
|2003.0|     Dinosaur Planet|
|2004.0|Isle of Man TT 20...|
|1997.0|           Character|
|1994.0|Paula Abdul's Get...|
|2004.0|The Rise and Fall...|
|1997.0|                Sick|
|1992.0|               8 Man|
|2004.0|What the #$*! Do ...|
|1991.0|Class of Nuke 'Em...|
|2001.0|             Fighter|
|1999.0|Full Frame: Docum...|
|1947.0|My Favorite Brunette|
|2003.0|Lord of the Rings...|
|1982.0|  Nature: Antarctica|
|1988.0|Neil Diamond: Gre...|
|1996.0|           Screamers|
|2005.0|           7 Seconds|
|1994.0|    Immortal Beloved|
|2000.0|By Dawn's Early L...|
|1972.0|     Seeta Aur Geeta|
+------+--------------------+
only showing top 20 rows

root
 |-- ano: double (nullable = true)
 |-- filme: string (nullable = true)



In [12]:
# Contando Valores nulos ou nan ou ausentes
df_columns=['ano', 'filme']
df_netflix.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_columns]).show()

+---+-----+
|ano|filme|
+---+-----+
|  7|    0|
+---+-----+



In [13]:
# Removendovendo valores nan e nulos
df_netflix = df_netflix.dropna()

In [14]:
# Verificando Valores nulos ou nan ou ausentes
df_columns_netflix=['ano', 'filme']
df_netflix.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_columns_netflix]).show()

+---+-----+
|ano|filme|
+---+-----+
|  0|    0|
+---+-----+



In [15]:
# Ajustando ano para inteiro
df_netflix = df_netflix.withColumn('ano', 
                                  df_netflix['ano']
                                  .cast('int'))

In [16]:
# Converter em Pandas
netflix_p_df = df_netflix.toPandas()

In [17]:
# Criando csv
netflix_p_df.to_csv('c:/arquivos/netflix.csv', index=False)

### ETL, Lendo e Tratando o Arquivo tsv - Amazon

In [18]:
# Extraindo o Arquivo Amazon e definindo o diretório de trabalho
file_url = 'https://s3.amazonaws.com/amazon-reviews-pds/tsv/amazon_reviews_us_Video_v1_00.tsv.gz'
file = 'c:/arquivos/amazon_reviews_us_Video_v1_00.tsv.gz'
request.urlretrieve(file_url , file)

('c:/arquivos/amazon_reviews_us_Video_v1_00.tsv.gz',
 <http.client.HTTPMessage at 0x2a388a4f8b0>)

In [19]:
# Descompactar o arquivo
from pyunpack import Archive
Archive(file).extractall('c:/arquivos/')

In [20]:
# Convertendo tsv para csv
tsv_file= 'c:/arquivos/amazon_reviews_us_Video_v1_00.tsv'
csv_table=pd.read_table(tsv_file, sep='\t', on_bad_lines='skip')
csv_table.to_csv('c:/arquivos/amazon_reviews_us_Video_v1_00.csv', index=False)

In [21]:
# Convertendo e visualizando DF pandas para PySpark
amazon = pd.read_csv('c:/arquivos/amazon_reviews_us_Video_v1_00.csv')
amazon = amazon.drop(columns=['marketplace', 'customer_id','review_id', 'product_id', 'product_parent', 'helpful_votes',
                     'total_votes', 'vine', 'verified_purchase', 'review_headline', 'review_body'])
amazon.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 380551 entries, 0 to 380550
Data columns (total 4 columns):
 #   Column            Non-Null Count   Dtype 
---  ------            --------------   ----- 
 0   product_title     380551 non-null  object
 1   product_category  380551 non-null  object
 2   star_rating       380551 non-null  int64 
 3   review_date       380524 non-null  object
dtypes: int64(1), object(3)
memory usage: 11.6+ MB


In [22]:
# Remover Duplicados
amazon = amazon.drop_duplicates()

In [23]:
# Valores NaN na coluna product_title
valores_ausentes_amazon = pd.isnull(amazon['product_title'])
amazon[valores_ausentes_amazon] 

Unnamed: 0,product_title,product_category,star_rating,review_date


In [24]:
# Valores NaN na coluna product_category
valores_ausentes_amazon = pd.isnull(amazon['product_category'])
amazon[valores_ausentes_amazon] 

Unnamed: 0,product_title,product_category,star_rating,review_date


In [25]:
# Valores NaN na coluna star_rating
valores_ausentes_amazon = pd.isnull(amazon['star_rating'])
amazon[valores_ausentes_amazon] 

Unnamed: 0,product_title,product_category,star_rating,review_date


In [26]:
# Valores NaN na coluna review_date
valores_ausentes_amazon = pd.isnull(amazon['review_date'])
amazon[valores_ausentes_amazon] 

Unnamed: 0,product_title,product_category,star_rating,review_date
31878,Remo Williams [VHS],Video,5,
47228,"Katharine Hepburn: ""All About Me: A self Portrait",Video,5,
57339,Disney Cartoon Classics Volumes 6 - 10 Box Set,Video,2,
60859,Lawless Breed [VHS],Video,5,
75935,Tombstone [VHS],Video,5,
82958,The Night of the Iguana [VHS],Video,5,
94216,Three Stooges:Bird in the Head [VHS],Video,5,
98824,Western Justice [VHS],Video,5,
99222,Billy the Kid in Texas [VHS],Video,5,
105433,Purple Heart [VHS],Video,5,


In [27]:
# Excluindo Valores ausentes
amazon = amazon.dropna(axis=0)

In [28]:
# verificando informações do DF
amazon.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 371620 entries, 0 to 380550
Data columns (total 4 columns):
 #   Column            Non-Null Count   Dtype 
---  ------            --------------   ----- 
 0   product_title     371620 non-null  object
 1   product_category  371620 non-null  object
 2   star_rating       371620 non-null  int64 
 3   review_date       371620 non-null  object
dtypes: int64(1), object(3)
memory usage: 14.2+ MB


In [29]:
schema = StructType([StructField('product_title', StringType(), True)\
                     ,StructField('product_category', StringType(), True)\
                     ,StructField('star_rating', IntegerType(), True)\
                     ,StructField('review_date', StringType(), True)
                    ])

In [30]:
df_amazon = spark.createDataFrame(amazon, schema = schema)
df_amazon.printSchema()

root
 |-- product_title: string (nullable = true)
 |-- product_category: string (nullable = true)
 |-- star_rating: integer (nullable = true)
 |-- review_date: string (nullable = true)



In [31]:
# Resultados
df_amazon.show()
df_amazon.printSchema()

+--------------------+----------------+-----------+-----------+
|       product_title|product_category|star_rating|review_date|
+--------------------+----------------+-----------+-----------+
|The Night They Sa...|           Video|          5| 2015-08-31|
|Hamlet / Kline, N...|           Video|          5| 2015-08-31|
|Nascar Dual Power...|           Video|          4| 2015-08-31|
|The Man From U.N....|           Video|          5| 2015-08-31|
|Playboy Video Par...|           Video|          3| 2015-08-31|
|Cabaret Balkan - ...|           Video|          5| 2015-08-31|
|Wrinkles:in Need ...|           Video|          4| 2015-08-31|
|Ladies Club,the [...|           Video|          4| 2015-08-31|
|The Campitelli Ad...|           Video|          5| 2015-08-31|
|Return of the Jed...|           Video|          5| 2015-08-31|
|  Class of '61 [VHS]|           Video|          3| 2015-08-31|
|Texas Carnival [VHS]|           Video|          5| 2015-08-31|
|National Geograph...|           Video| 

In [32]:
# Verificando Valores nulos ou nan ou ausentes
df_columns_amazon=['product_title', 'product_category', 'star_rating', 'review_date']
df_amazon.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_columns_amazon]).show()

+-------------+----------------+-----------+-----------+
|product_title|product_category|star_rating|review_date|
+-------------+----------------+-----------+-----------+
|            0|               0|          0|          0|
+-------------+----------------+-----------+-----------+



In [33]:
# Converter em Pandas
amazon_p_df = df_amazon.toPandas()

In [34]:
# Criando csv
amazon_p_df.to_csv('c:/arquivos/amazon.csv', index=False)

# Conectar ao MySQL

In [35]:
import mysql.connector
from mysql.connector import Error

In [36]:
# Função para criar conexão com MySQL
def criar_conexao(host_name, user_name, user_password):
    conexao = None
    try:
        conexao = mysql.connector.connect(
            host=host_name,
            user=user_name,
            passwd=user_password
        )
        print('Conexão OK')
    except Error as err:
        print(f"Error: '{err}'")

    return conexao

In [41]:
# Testando Conexão
conexao = criar_conexao("localhost", "root", '123Qwe..@')

Conexão OK


In [42]:
# Função para criar uma banco de dados
def criar_db(connection, query):
    cursor = connection.cursor()
    try:
        cursor.execute(query)
        print("Banco Criado")
    except Error as err:
        print(f"Error: '{err}'")

In [43]:
# Criando DB
criar_db_q = 'CREATE DATABASE solvimm'

In [44]:
criar_db(conexao, criar_db_q)

Banco Criado


# Subir arquivos CSV para MySQL

In [45]:
# Função para rodar as consultas
def executar_q(conexao, query):
    cursor = conexao.cursor()
    try:
        cursor.execute(query)
        conexao.commit()
        print("Consulta Executada")
    except Error as err:
        print(f"Error: '{err}'")

In [46]:
# -*- coding: utf-8 -*-
import csv
import pymysql
import config

In [47]:
# Configurações do DB
config={
    'user':'root', #nome do Usuário do Banco
    'host':'localhost', #nome do Host do Banco
    'password':'123Qwe..@', #senha do Banco
    'database':'solvimm'#nome do banco de dados
}

In [48]:
# Conexão
conexao = pymysql.connect(**config, charset='utf8')
cursor = conexao.cursor()
input_file_netflix = csv.DictReader(open('c:/arquivos/netflix.csv', encoding='utf-8'))
input_file_amazon = csv.DictReader(open('c:/arquivos/amazon.csv', encoding='utf-8'))

In [49]:
# Usar db
usar_db_solvim = 'USE solvimm'
executar_q(conexao, usar_db_solvim)

Consulta Executada


In [50]:
# Criando a tabela neflix
criar_tabela_netflix = 'CREATE TABLE netflix (ano INTEGER, filme VARCHAR(255));'
executar_q(conexao, criar_tabela_netflix)

Consulta Executada


In [51]:
# Criando a tabela Amazon
criar_tabela_amazon = 'CREATE TABLE amazon (product_title LONGTEXT, product_category VARCHAR(255), star_rating INTEGER, review_date DATE);'
executar_q(conexao, criar_tabela_amazon)

Consulta Executada


In [52]:
# Injeção dos dados Netflix
for row in input_file_netflix:
    cursor.execute('INSERT INTO netflix (ano, filme) \
                VALUES (%s, %s)', (row['ano'], row['filme']))
    conexao.commit()

In [53]:
# Injeção dos dados Amazon
for row in input_file_amazon:
    cursor.execute('INSERT INTO amazon (product_title, product_category, star_rating, review_date) \
                VALUES (%s, %s, %s, %s)', (row['product_title'], row['product_category'], row['star_rating'], row['review_date']))
    conexao.commit()

In [54]:
# Vendo dados em Netflix
verifica_netflix = 'SELECT * FROM netflix;'
cursor.execute(verifica_netflix)
cursor.fetchall()
# Registros Inseridos
print("Número total de registros retornados: ", cursor.rowcount)

Número total de registros retornados:  17747


In [55]:
# Vendo dados em Amazon
verifica_amazon = 'SELECT * FROM amazon;'
cursor.execute(verifica_amazon)
cursor.fetchall()
# Registros Inseridos
print("Número total de registros retornados: ", cursor.rowcount)

Número total de registros retornados:  371620


In [56]:
conexao.close()