In [17]:
!pip install pyspark



In [None]:
from pyspark.sql import SparkSession

# Cria a sessão do Spark
spark = SparkSession.builder.appName("CSV to Parquet").getOrCreate()

# Caminho do arquivo CSV de entrada
input_path = "/content/drive/MyDrive/Colab Notebooks/desafio_parte3/movies.csv"

# Lê o arquivo CSV
data_frame = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("delimiter", "|").load(input_path)

# Remove as colunas indesejadas
columns_to_drop = ["anoFalecimento", "profissao", "titulosMaisConhecidos"]
data_frame = data_frame.drop(*columns_to_drop)

# Converte os tipos de dados necessários
data_frame = data_frame.withColumn("anoLancamento", data_frame["anoLancamento"].cast("integer"))
data_frame = data_frame.withColumn("tempoMinutos", data_frame["tempoMinutos"].cast("integer"))
data_frame = data_frame.withColumn("notaMedia", data_frame["notaMedia"].cast("float"))
data_frame = data_frame.withColumn("numeroVotos", data_frame["numeroVotos"].cast("integer"))
data_frame = data_frame.withColumn("anoNascimento", data_frame["anoNascimento"].cast("integer"))

# Mostra o resultado em DataFrame
data_frame.show()

# Encerra a sessão do Spark
spark.stop()

In [None]:
from pyspark.sql import SparkSession

# Cria a sessão do Spark
spark = SparkSession.builder.appName("CSV to Parquet").getOrCreate()

# Caminho do arquivo CSV de entrada
input_path = "/content/drive/MyDrive/Colab Notebooks/desafio_parte3/movies.csv"

# Lê o arquivo CSV
data_frame = spark.read.format("csv").option("header", "true").option("inferSchema", "true").option("delimiter", "|").load(input_path)

# Remove as colunas indesejadas
columns_to_drop = ["anoFalecimento", "profissao", "titulosMaisConhecidos"]
data_frame = data_frame.drop(*columns_to_drop)

# Converte os tipos de dados necessários
data_frame = data_frame.withColumn("anoLancamento", data_frame["anoLancamento"].cast("integer"))
data_frame = data_frame.withColumn("tempoMinutos", data_frame["tempoMinutos"].cast("integer"))
data_frame = data_frame.withColumn("notaMedia", data_frame["notaMedia"].cast("float"))
data_frame = data_frame.withColumn("numeroVotos", data_frame["numeroVotos"].cast("integer"))
data_frame = data_frame.withColumn("anoNascimento", data_frame["anoNascimento"].cast("integer"))

# Substitui os valores "\N" por nulos (None)
data_frame = data_frame.na.replace("\\N", None)

# Mostra o resultado em DataFrame
data_frame.show()

# Encerra a sessão do Spark
spark.stop()

## Dropando as colunas "anoFalecimento", "profissao", "titulosMaisConhecidos"

In [None]:
import pandas as pd

# Define o número máximo de linhas a serem exibidas
pd.set_option('display.max_rows', 100)  # Define 1000 como o número máximo de linhas a serem exibidas

# Caminho do arquivo CSV de entrada
input_path = "/content/drive/MyDrive/Colab Notebooks/desafio_parte3/movies.csv"

# Lê o arquivo CSV usando pandas
data_frame = pd.read_csv(input_path, delimiter="|")

# Remove as colunas indesejadas
columns_to_drop = ["anoFalecimento", "profissao", "titulosMaisConhecidos"]
data_frame.drop(columns_to_drop, axis=1, inplace=True)

# Converte os tipos de dados necessários
data_frame["anoLancamento"] = pd.to_numeric(data_frame["anoLancamento"], errors="coerce")
data_frame["tempoMinutos"] = pd.to_numeric(data_frame["tempoMinutos"], errors="coerce")
data_frame["notaMedia"] = pd.to_numeric(data_frame["notaMedia"], errors="coerce")
data_frame["numeroVotos"] = pd.to_numeric(data_frame["numeroVotos"], errors="coerce")
data_frame["anoNascimento"] = pd.to_numeric(data_frame["anoNascimento"], errors="coerce")

# Substitui os valores "\N" por nulos (NaN)
data_frame.replace("\\N", pd.NA, inplace=True)

# Mostra o resultado em DataFrame
data_frame.head(20)

# # Salva o DataFrame em formato Parquet (opcional)
# output_path = "/content/drive/MyDrive/Colab Notebooks/desafio_parte3/movies.parquet"
# data_frame.to_parquet(output_path, engine="pyarrow")

## Dropando registros NA

In [None]:
import pandas as pd

# Define o número máximo de linhas a serem exibidas
pd.set_option('display.max_rows', 100)  # Define 1000 como o número máximo de linhas a serem exibidas

# Caminho do arquivo CSV de entrada
input_path = "/content/drive/MyDrive/Colab Notebooks/desafio_parte3/movies.csv"

# Lê o arquivo CSV usando pandas
data_frame = pd.read_csv(input_path, delimiter="|")

# Remove as colunas indesejadas
columns_to_drop = ["anoFalecimento", "profissao", "titulosMaisConhecidos"]
data_frame.drop(columns_to_drop, axis=1, inplace=True)

# Converte os tipos de dados necessários
data_frame["anoLancamento"] = pd.to_numeric(data_frame["anoLancamento"], errors="coerce")
data_frame["tempoMinutos"] = pd.to_numeric(data_frame["tempoMinutos"], errors="coerce")
data_frame["notaMedia"] = pd.to_numeric(data_frame["notaMedia"], errors="coerce")
data_frame["numeroVotos"] = pd.to_numeric(data_frame["numeroVotos"], errors="coerce")
data_frame["anoNascimento"] = pd.to_numeric(data_frame["anoNascimento"], errors="coerce")

# Substitui os valores "\N" por nulos (NaN)
data_frame.replace("\\N", pd.NA, inplace=True)

# Dropa os registros que possuem valores NA
data_frame.dropna(inplace=True)

# Mostra o resultado em DataFrame
data_frame.head(10)


## Remover duplicados

In [None]:
import pandas as pd

# Define o número máximo de linhas a serem exibidas
pd.set_option('display.max_rows', 100)  # Define 1000 como o número máximo de linhas a serem exibidas

# Caminho do arquivo CSV de entrada
input_path = "/content/drive/MyDrive/Colab Notebooks/desafio_parte3/movies.csv"

# Lê o arquivo CSV usando pandas
data_frame = pd.read_csv(input_path, delimiter="|")

# Remove as colunas indesejadas
columns_to_drop = ["anoFalecimento", "profissao", "titulosMaisConhecidos"]
data_frame.drop(columns_to_drop, axis=1, inplace=True)

# Converte os tipos de dados necessários
data_frame["anoLancamento"] = pd.to_numeric(data_frame["anoLancamento"], errors="coerce")
data_frame["tempoMinutos"] = pd.to_numeric(data_frame["tempoMinutos"], errors="coerce")
data_frame["notaMedia"] = pd.to_numeric(data_frame["notaMedia"], errors="coerce")
data_frame["numeroVotos"] = pd.to_numeric(data_frame["numeroVotos"], errors="coerce")
data_frame["anoNascimento"] = pd.to_numeric(data_frame["anoNascimento"], errors="coerce")

# Substitui os valores "\N" por nulos (NaN)
data_frame.replace("\\N", pd.NA, inplace=True)

# Dropa os registros que possuem valores NA
data_frame.dropna(inplace=True)

# Remove os IDs duplicados
data_frame.drop_duplicates(subset="id", keep="first", inplace=True)

# Mostra o resultado em DataFrame
data_frame.head(100)


## Remover duplicados Pyspark

In [None]:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Cria uma sessão Spark
spark = SparkSession.builder.appName("Pandas para PySpark").getOrCreate()

# Define o caminho de entrada
input_path = "/content/drive/MyDrive/Colab Notebooks/desafio_parte3/movies.csv"

# Lê o arquivo CSV usando Spark
data_frame = spark.read.option("delimiter", "|").csv(input_path, header=True)

# Remove colunas indesejadas
columns_to_drop = ["anoFalecimento", "profissao", "titulosMaisConhecidos"]
data_frame = data_frame.drop(*columns_to_drop)

# Converte os tipos de dados necessários
data_frame = data_frame.withColumn("anoLancamento", col("anoLancamento").cast("int")) \
    .withColumn("tempoMinutos", col("tempoMinutos").cast("int")) \
    .withColumn("notaMedia", col("notaMedia").cast("float")) \
    .withColumn("numeroVotos", col("numeroVotos").cast("int")) \
    .withColumn("anoNascimento", col("anoNascimento").cast("int"))

# Substitui os valores "\N" por nulos (None)
data_frame = data_frame.replace("\\N", None)

# Remove registros com valores nulos
data_frame = data_frame.na.drop()

# Remove IDs duplicados
data_frame = data_frame.dropDuplicates(["id"])

# Mostra o resultado como um DataFrame
data_frame.show(100)


## Apenas genero de Horror

In [None]:
import pandas as pd
from pyspark.sql.functions import col

# Define o número máximo de linhas a serem exibidas
pd.set_option('display.max_rows', 100)  # Define 1000 como o número máximo de linhas a serem exibidas

# Caminho do arquivo CSV de entrada
input_path = "/content/drive/MyDrive/Colab Notebooks/desafio_parte3/movies.csv"

# Lê o arquivo CSV usando pandas
data_frame = pd.read_csv(input_path, delimiter="|")

# Remove as colunas indesejadas
columns_to_drop = ["anoFalecimento", "profissao", "titulosMaisConhecidos"]
data_frame.drop(columns_to_drop, axis=1, inplace=True)

# Converte os tipos de dados necessários
data_frame["anoLancamento"] = pd.to_numeric(data_frame["anoLancamento"], errors="coerce")
data_frame["tempoMinutos"] = pd.to_numeric(data_frame["tempoMinutos"], errors="coerce")
data_frame["notaMedia"] = pd.to_numeric(data_frame["notaMedia"], errors="coerce")
data_frame["numeroVotos"] = pd.to_numeric(data_frame["numeroVotos"], errors="coerce")
data_frame["anoNascimento"] = pd.to_numeric(data_frame["anoNascimento"], errors="coerce")

# Substitui os valores "\N" por nulos (NaN)
data_frame.replace("\\N", pd.NA, inplace=True)

# Dropa os registros que possuem valores NA
data_frame.dropna(inplace=True)

# Remove os IDs duplicados
data_frame.drop_duplicates(subset="id", keep="first", inplace=True)

# Filtra os registros por gênero "Terror" e "Horror"
terror_horror_movies = data_frame[data_frame["genero"].str.contains("Horror")]
# terror_horror_movies = data_frame.where(col("genero").contains("Horror"))


# Mostra o resultado em DataFrame
data_frame.head(50)


## Apenas genero de Horror Pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower, array_contains

# Cria uma sessão Spark
spark = SparkSession.builder.appName("Pandas para PySpark").getOrCreate()

# Define o caminho de entrada
input_path = "/content/drive/MyDrive/Colab Notebooks/desafio_parte3/movies.csv"

# Lê o arquivo CSV usando Spark
data_frame = spark.read.option("delimiter", "|").csv(input_path, header=True)

# Remove colunas indesejadas
columns_to_drop = ["anoFalecimento", "profissao", "titulosMaisConhecidos"]
data_frame = data_frame.drop(*columns_to_drop)

# Converte os tipos de dados necessários
data_frame = data_frame.withColumn("anoLancamento", col("anoLancamento").cast("int")) \
    .withColumn("notaMedia", col("notaMedia").cast("float")) \
    .withColumn("numeroVotos", col("numeroVotos").cast("int")) \
    .withColumn("anoNascimento", col("anoNascimento").cast("int"))

# Substitui os valores "\N" por nulos (None)
data_frame = data_frame.replace("\\N", None)

# Remove registros com valores nulos
data_frame = data_frame.na.drop()

# Remove IDs duplicados
data_frame = data_frame.dropDuplicates(["id"])

# Filtra os registros que contenham na coluna "genero" os valores "terror" e "horror"
# data_frame_genero = data_frame.where(col("genero").contains("Horror"))

data_frame_genero = data_frame.where(col("genero").contains("Horror")).select("id", "tituloPincipal", "anoLancamento", "genero", "notaMedia", "numeroVotos")

# Mostra o resultado como um DataFrame
data_frame_genero.show(10)

+---------+--------------------+-------------+--------------------+---------+-----------+
|       id|      tituloPincipal|anoLancamento|              genero|notaMedia|numeroVotos|
+---------+--------------------+-------------+--------------------+---------+-----------+
|tt0003419|The Student of Pr...|         1913|Drama,Fantasy,Horror|      6.4|       2171|
|tt0004121|The Hound of the ...|         1914|Crime,Horror,Mystery|      5.5|        119|
|tt0005231|The Hound of the ...|         1915|Crime,Horror,Mystery|      3.2|         41|
|tt0005951|    Satan's Rhapsody|         1917|Drama,Fantasy,Horror|      6.8|        748|
|tt0006820| Homunculus, 1. Teil|         1916|       Horror,Sci-Fi|      5.9|        101|
|tt0007183| The Queen of Spades|         1916|Drama,Fantasy,Horror|      6.7|        761|
|tt0007983|              Furcht|         1917|              Horror|      6.2|        184|
|tt0008099|Hilde Warren und ...|         1917|              Horror|      5.8|        128|
|tt0008252

In [None]:
data_frame_genero.count()

13025

In [None]:
data_frame_genero.printSchema()

root
 |-- id: string (nullable = true)
 |-- tituloPincipal: string (nullable = true)
 |-- anoLancamento: integer (nullable = true)
 |-- genero: string (nullable = true)
 |-- notaMedia: float (nullable = true)
 |-- numeroVotos: integer (nullable = true)



## Converter em pastas particionadas para csv e visualizar o schema das colunas

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower
from datetime import datetime

# Cria uma sessão Spark
spark = SparkSession.builder.appName("Pandas para PySpark").getOrCreate()

# Define o caminho de entrada
input_path = "/content/drive/MyDrive/Colab Notebooks/desafio_parte3/movies.csv"

# Lê o arquivo CSV usando Spark
data_frame = spark.read.option("delimiter", "|").csv(input_path, header=True)

# Remove colunas indesejadas
columns_to_drop = ["anoFalecimento", "profissao", "titulosMaisConhecidos", "personagem", "tempoMinutos"]
data_frame = data_frame.drop(*columns_to_drop)

# Converte os tipos de dados necessários
data_frame = data_frame.withColumn("anoLancamento", col("anoLancamento").cast("int")) \
    .withColumn("notaMedia", col("notaMedia").cast("float")) \
    .withColumn("numeroVotos", col("numeroVotos").cast("int")) \
    .withColumn("anoNascimento", col("anoNascimento").cast("int"))

# Substitui os valores "\N" por nulos (None)
data_frame = data_frame.replace("\\N", None)

# Remove registros com valores nulos
data_frame = data_frame.na.drop()

# Remove IDs duplicados
data_frame = data_frame.dropDuplicates(["id"])

# Filtra os registros que contenham na coluna "genero" os valores "terror" e "horror"
# data_frame_genero = data_frame.filter((lower(col("genero")) == "horror"))
data_frame_genero = data_frame.where(col("genero").contains("Horror"))

# Define o caminho de saída
now = datetime.now()
output_path = f"/content/drive/MyDrive/Colab Notebooks/desafio_parte3/trusted/csv/movies/output_csv/{now.year}/{now.month}/{now.day}/"

# # Salva o resultado em um arquivo CSV no caminho especificado
# data_frame_genero.write.csv(output_path, header=True)


# Salva o resultado em um único arquivo CSV no caminho especificado
data_frame_genero.write.mode("overwrite").csv(output_path, header=True)

import os

# Cria o diretório se ele não existir
os.makedirs(output_path, exist_ok=True)
# print(data_frame_genero.schema)
# print(data_frame_genero.show(50))
# Salva o resultado em um único arquivo CSV no caminho especificado
data_frame_pandas = data_frame_genero.toPandas()
data_frame_pandas.to_csv(output_path + "/result.csv", index=False)


In [None]:
data_frame_genero.count()

13722

## codigo csv IMDB para parquet particionado

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower
from datetime import datetime
import os

# Cria uma sessão Spark
spark = SparkSession.builder.appName("Pandas para PySpark").getOrCreate()

# Define o caminho de entrada
input_path = "/content/drive/MyDrive/Colab Notebooks/desafio_parte3/movies.csv"

# Lê o arquivo CSV usando Spark
data_frame = spark.read.option("delimiter", "|").csv(input_path, header=True)

# Remove colunas indesejadas
columns_to_drop = ["anoFalecimento", "profissao", "titulosMaisConhecidos", "personagem", "tempoMinutos"]
data_frame = data_frame.drop(*columns_to_drop)

# Converte os tipos de dados necessários
data_frame = data_frame.withColumn("anoLancamento", col("anoLancamento").cast("int")) \
    .withColumn("notaMedia", col("notaMedia").cast("float")) \
    .withColumn("numeroVotos", col("numeroVotos").cast("int")) \
    .withColumn("anoNascimento", col("anoNascimento").cast("int"))

# Substitui os valores "\N" por nulos (None)
data_frame = data_frame.replace("\\N", None)

# Remove registros com valores nulos
data_frame = data_frame.na.drop()

# Remove IDs duplicados
data_frame = data_frame.dropDuplicates(["id"])

# Filtra os registros que contenham na coluna "genero" os valores "terror" e "horror"
data_frame_genero = data_frame.where(col("genero").contains("Horror"))

# Cria o diretório se ele não existir
os.makedirs(output_path, exist_ok=True)

# Define o caminho de saída
now = datetime.now()
output_path = f"/content/drive/MyDrive/Colab Notebooks/desafio_parte3/trusted/csv/movies/output_parquet/{now.year}/{now.month}/{now.day}"


# Salva o resultado em formato Parquet no caminho especificado
data_frame_genero.write.mode("overwrite").parquet(output_path)


In [None]:
data_frame_genero.show(100)

In [None]:
data_frame_genero.count()

14643

## Para AWS Glue

In [None]:
from datetime import datetime
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Cria uma sessão Spark
spark_context = SparkContext()
glue_context = GlueContext(spark_context)
spark = glue_context.spark_session

# Define o caminho de entrada
input_path = "s3://data-lake-heliton/input/movies.csv"

# Lê o arquivo CSV usando Spark
data_frame = spark.read.option("delimiter", "|").csv(input_path, header=True)

# Remove colunas indesejadas
columns_to_drop = ["anoFalecimento", "profissao", "titulosMaisConhecidos", "personagem"]
data_frame = data_frame.drop(*columns_to_drop)

# Converte os tipos de dados necessários
data_frame = data_frame.withColumn("anoLancamento", col("anoLancamento").cast("int")) \
    .withColumn("tempoMinutos", col("tempoMinutos").cast("int")) \
    .withColumn("notaMedia", col("notaMedia").cast("float")) \
    .withColumn("numeroVotos", col("numeroVotos").cast("int")) \
    .withColumn("anoNascimento", col("anoNascimento").cast("int"))

# Substitui os valores "\N" por nulos (None)
data_frame = data_frame.replace("\\N", None)

# Remove registros com valores nulos
data_frame = data_frame.na.drop()

# Remove IDs duplicados
data_frame = data_frame.dropDuplicates(["id"])

# Filtra os registros que contenham na coluna "genero" os valores "horror" e "terror"
ddata_frame_genero = data_frame.where(col("genero").contains("Horror"))

# Define o caminho de saída
now = datetime.now()
output_path = f"s3://data-lake-heliton/trusted/csv_imdb/parquet/{now.year}/{now.month}/{now.day}"

# Salva o resultado em formato Parquet no caminho especificado
data_frame_genero.write.mode("overwrite").parquet(output_path)


In [None]:
from datetime import datetime
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import sys

# Cria uma sessão Spark
spark_context = SparkContext()
glue_context = GlueContext(spark_context)
spark = glue_context.spark_session

# Obter os argumentos resolvidos
args = getResolvedOptions(sys.argv, ['JOB_NAME', 'S3_INPUT_PATH', 'S3_TARGET_PATH'])

# Define o caminho de entrada e o caminho de saída
input_path = args['S3_INPUT_PATH']
output_path = args['S3_TARGET_PATH']

# Lê o arquivo CSV usando Spark
data_frame = spark.read.option("delimiter", "|").csv(input_path, header=True)

# Remove colunas indesejadas
columns_to_drop = ["anoFalecimento", "profissao", "titulosMaisConhecidos", "personagem"]
data_frame = data_frame.drop(*columns_to_drop)

# Converte os tipos de dados necessários
data_frame = data_frame.withColumn("anoLancamento", col("anoLancamento").cast("int")) \
    .withColumn("tempoMinutos", col("tempoMinutos").cast("int")) \
    .withColumn("notaMedia", col("notaMedia").cast("float")) \
    .withColumn("numeroVotos", col("numeroVotos").cast("int")) \
    .withColumn("anoNascimento", col("anoNascimento").cast("int"))

# Substitui os valores "\N" por nulos (None)
data_frame = data_frame.replace("\\N", None)

# Remove registros com valores nulos
data_frame = data_frame.na.drop()

# Remove IDs duplicados
data_frame = data_frame.dropDuplicates(["id"])

# Filtra os registros que contenham na coluna "genero" os valores "horror" e "terror"
data_frame_genero = data_frame.filter((col("genero").contains("horror")) | (col("genero").contains("terror")))

# Salva o resultado em formato Parquet no caminho especificado
data_frame_genero.write.mode("overwrite").parquet(output_path)

# s3://data-lake-heliton/RAW/Local/CSV/Movies/2023/6/15/