### Instalando dependencias e iniciando bibliotecas

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
!pip install pyspark
!pip install boto3
!pip install awscli
!pip install jsonlines

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488493 sha256=5834e2f9f509beef010aba6b0ca1775e6bb2d57807e3436215fe830893bca935
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1
Collecting boto3
  Downloading boto3-1.34.59-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting botocore<1.35.0,>=1.34.59 (from boto3)
  Downloading botocore-1.3

In [3]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext as sc
from pyspark.sql.session import SparkSession
import pyspark
from pyspark.sql.functions import col, lit, collect_list
from pyspark.sql.types import StructType, StructField, StringType, LongType, ArrayType, DoubleType, IntegerType, FloatType, BooleanType
from pyspark.sql import functions as F
import boto3
import datetime
import pandas as pd
import json
import requests
import glob
import math
from pathlib import Path

## CSV

### Lendo a base de dados CSV

In [None]:
# prompt: crir um codigo em pyspark para ler os dados de um csv numa pasta do google colab

# Criar uma SparkSession
spark = SparkSession.builder.appName("LeituraCSV").getOrCreate()

# Definir o caminho para o arquivo CSV
caminho_csv = "/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_romance_pronto.csv"

# Ler o arquivo CSV usando o Spark
df = spark.read.csv(caminho_csv, header=True, inferSchema=True)

# Imprimir o DataFrame
df.show(5, truncate=False)


+---------+-------------------------+-------------+-----------------------+---------+-----------+
|id       |tituloPincipal           |anoLancamento|genero                 |notaMedia|numeroVotos|
+---------+-------------------------+-------------+-----------------------+---------+-----------+
|tt0000009|Miss Jerry               |1894         |Romance                |5.3      |200        |
|tt0001175|Camille                  |1912         |Drama,Romance          |5.3      |38         |
|tt0001475|Amor fatal               |1911         |Drama,Romance          |7.3      |21         |
|tt0003442|Tess of the D'Urbervilles|1913         |Drama,Romance          |5.9      |25         |
|tt0004207|The Last Egyptian        |1914         |Adventure,Drama,Romance|5.2      |22         |
+---------+-------------------------+-------------+-----------------------+---------+-----------+
only showing top 5 rows



### Tirando Colunas Desnecessarias

In [None]:
from pyspark.sql import SparkSession

# Inicialize a sessão do Spark
spark = SparkSession.builder \
    .appName("RemoveColumnsAndSaveCSV") \
    .getOrCreate()

# Caminho do arquivo CSV
file_path = "/content/drive/MyDrive/ProjetoBolsa/dados/dadosRaw/movies.csv"

# Ler o arquivo CSV
df = spark.read.csv(file_path, sep='|', header=True)

# Remover colunas
columns_to_drop = ['tituloOriginal', 'tempoMinutos', 'generoArtista', 'personagem', 'nomeArtista', 'anoNascimento', 'anoFalecimento', 'profissao', 'titulosMaisConhecidos']
df = df.drop(*columns_to_drop)

# Converter DataFrame do Spark para DataFrame do Pandas
df_pandas = df.toPandas()

# Caminho para salvar o novo arquivo CSV
output_path = "/content/drive/MyDrive/ProjetoBolsa/dados/dadosRaw/movies_colunas.csv"

# Salvar o DataFrame resultante em um novo arquivo CSV usando Pandas
df_pandas.to_csv(output_path, sep='|', index=False)

# Encerrar a sessão do Spark
spark.stop()


In [None]:
import pandas as pd

pd.read_csv('/content/drive/MyDrive/ProjetoBolsa/dados/dadosRaw/movies.csv', sep='|')\
    .drop(columns=['tituloOriginal', 'tempoMinutos', 'generoArtista','personagem','nomeArtista','anoNascimento','anoFalecimento','profissao','titulosMaisConhecidos'])\
    .to_csv('/content/drive/MyDrive/ProjetoBolsa/dados/dadosRaw/movies_colunas.csv', sep='|', index=False)

  pd.read_csv('/content/drive/MyDrive/ProjetoBolsa/dados/dadosRaw/movies.csv', sep='|')\


### Filtrando os Filmes com Genero Romance


In [None]:
# Iniciar a sessão Spark
spark = SparkSession.builder \
    .appName("Filtragem de Dados") \
    .getOrCreate()
# Caminho do arquivo CSV de origem
caminho_arquivo_origem = '/content/drive/MyDrive/ProjetoBolsa/dados/dadosRaw/movies_colunas.csv'
# Caminho completo do arquivo CSV de destino
caminho_arquivo_destino = '/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_romance.csv'
# Coluna que deseja filtrar
coluna_alvo = 'genero'
# Valor a ser filtrado
valor_filtrado = 'Romance'

try:
    # Carregar o arquivo CSV como um DataFrame Spark
    df = spark.read.option("delimiter", "|").csv(caminho_arquivo_origem, header=True)
    # Filtrar os dados com base na coluna alvo e no valor desejado
    dados_filtrados = df.filter(col(coluna_alvo).contains(valor_filtrado))
    # Converter o DataFrame Spark em um DataFrame pandas
    dados_pandas = dados_filtrados.toPandas()
    # Salvar os dados filtrados como um arquivo CSV
    dados_pandas.to_csv(caminho_arquivo_destino, index=False)
    print("Dados filtrados salvos com sucesso em", caminho_arquivo_destino)
except Exception as e:
    print("Ocorreu um erro:", e)
# Encerrar a sessão Spark
spark.stop()

Dados filtrados salvos com sucesso em /content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_romance.csv


##### Exibindo dados

In [None]:
spark = SparkSession.builder.appName("LeituraCSV").getOrCreate()

# Definir o caminho para o arquivo CSV
caminho_csv = "/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_romance.csv"

# Ler o arquivo CSV usando o Spark
df_romance = spark.read.csv(caminho_csv, header=True, inferSchema=True)

# Imprimir o DataFrame
df_romance.show(5, truncate=False)


+---------+--------------+--------------------+-------------+------------+-------------+---------+-----------+-------------+--------------------------------------------------------------+-----------------+-------------+--------------+----------------------------+---------------------------------------+
|id       |tituloPincipal|tituloOriginal      |anoLancamento|tempoMinutos|genero       |notaMedia|numeroVotos|generoArtista|personagem                                                    |nomeArtista      |anoNascimento|anoFalecimento|profissao                   |titulosMaisConhecidos                  |
+---------+--------------+--------------------+-------------+------------+-------------+---------+-----------+-------------+--------------------------------------------------------------+-----------------+-------------+--------------+----------------------------+---------------------------------------+
|tt0000009|Miss Jerry    |Miss Jerry          |1894         |45          |Romance      |

### Filtrando os ID's unicos

### Id's unicos Pyspark CSV Romance

In [None]:
#Este está correto porem gera varios arquivos
def ler_ids_unicos_e_salvar(caminho_arquivo_csv, pasta_destino):
    # Inicia uma sessão Spark
    spark = SparkSession.builder \
        .appName("Ler IDs Únicos e Salvar") \
        .getOrCreate()

    # Carrega o arquivo CSV como um DataFrame Spark
    df = spark.read.option("delimiter", "|").option("header", "true").csv(caminho_arquivo_csv)


    # Extrai os IDs únicos da coluna 'id' e realiza a operação de filtro em um único passo
    df_resultado = df.dropDuplicates(['id'])

    # Salva os dados dos IDs únicos como um arquivo CSV diretamente
    df_resultado.write.option("header", "true").csv(pasta_destino + '/movies_ids_unicos_py.csv')

    # Encerra a sessão Spark
    spark.stop()


caminho_arquivo_csv = "/content/drive/MyDrive/ProjetoBolsa/dados/dadosRaw/movies_colunas.csv"
pasta_destino = "/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV"
ler_ids_unicos_e_salvar(caminho_arquivo_csv, pasta_destino)


#### junção dos dados csv do pysarpk


In [None]:
def juntar_arquivos_csv(pasta_origem, nome_arquivo_saida):
    # Lista todos os arquivos CSV na pasta de origem
    arquivos_csv = glob.glob(pasta_origem + '/*.csv')

    # Lista para armazenar os DataFrames de cada arquivo CSV
    dfs = []

    # Lê cada arquivo CSV e adiciona seu DataFrame à lista de DataFrames
    for arquivo_csv in arquivos_csv:
        df = pd.read_csv(arquivo_csv)
        dfs.append(df)

    # Concatena todos os DataFrames em um único DataFrame
    df_concatenado = pd.concat(dfs, ignore_index=True)

    # Salva o DataFrame concatenado como um arquivo CSV
    df_concatenado.to_csv(nome_arquivo_saida, index=False)

# Pasta de origem contendo os arquivos CSV gerados pelo código anterior
pasta_origem = "/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_ids_unicos_py.csv"

# Nome do arquivo de saída que conterá todos os dados juntos
nome_arquivo_saida = "/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_ids_unicos.csv"

# Chama a função para juntar os arquivos CSV
juntar_arquivos_csv(pasta_origem, nome_arquivo_saida)


In [None]:
spark = SparkSession.builder.appName("LeituraCSV").getOrCreate()

# Definir o caminho para o arquivo CSV
caminho_csv = "/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_ids_unicos.csv"

# Ler o arquivo CSV usando o Spark
df_id = spark.read.csv(caminho_csv, header=True, inferSchema=True)

# Imprimir o DataFrame
df_id.show(5, truncate=False)

+---------+------------------------------+-------------+--------------------------+---------+-----------+
|id       |tituloPincipal                |anoLancamento|genero                    |notaMedia|numeroVotos|
+---------+------------------------------+-------------+--------------------------+---------+-----------+
|tt0000009|Miss Jerry                    |1894         |Romance                   |5.3      |200        |
|tt0000574|The Story of the Kelly Gang   |1906         |Action,Adventure,Biography|6.0      |797        |
|tt0000591|The Prodigal Son              |1907         |Drama                     |5.1      |20         |
|tt0000615|Robbery Under Arms            |1907         |Drama                     |4.3      |23         |
|tt0000679|The Fairylogue and Radio-Plays|1908         |Adventure,Fantasy         |5.2      |66         |
+---------+------------------------------+-------------+--------------------------+---------+-----------+
only showing top 5 rows



### Limpando a tabela Id_Unicos

In [None]:
# Inicializa uma SparkSession
spark = SparkSession.builder \
    .appName("Limpeza de Dados") \
    .getOrCreate()

# Caminho do arquivo CSV
nome_arquivo = '/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_ids_unicos.csv'

# Carrega os dados CSV para um DataFrame Spark
dados = spark.read.csv(nome_arquivo, header=True, inferSchema=True)

# Remove linhas com células vazias
dados_limpos = dados.na.drop()

# Coleta os dados limpos em um DataFrame Pandas
dados_limpos_pandas = dados_limpos.toPandas()

# Caminho para salvar o arquivo limpo
pasta_destino = '/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/'
nome_arquivo_limpo = pasta_destino + 'movies_dados_id_limpo.csv'

# Salva os dados limpos como arquivo CSV usando o Pandas
dados_limpos_pandas.to_csv(nome_arquivo_limpo, index=False)

print("Linhas com células vazias removidas e arquivo salvo com sucesso!")

# Encerra a SparkSession
spark.stop()

Linhas com células vazias removidas e arquivo salvo com sucesso!


### Retirando os dados N

In [None]:
# Inicialize a SparkSession
spark = SparkSession.builder \
    .appName("Remove linhas com valor 'N'") \
    .getOrCreate()

# Caminho para o arquivo CSV de entrada
caminho_entrada = r"/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_dados_id_limpo.csv"

# Caminho para o arquivo CSV de saída (sem linhas com '\N')
caminho_saida = r"/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_pronto.csv"

# Lê o arquivo CSV como um DataFrame
df = spark.read.csv(caminho_entrada, header=True)

# Remove as linhas onde qualquer coluna contém '\N'
for coluna in df.columns:
    df = df.filter(col(coluna) != '\\N')

# Converter DataFrame do Spark para DataFrame do Pandas
df_pandas = df.toPandas()

# Salva o DataFrame resultante em outro arquivo CSV usando Pandas
df_pandas.to_csv(caminho_saida, index=False)

# Encerra a SparkSession
spark.stop()



In [None]:
spark = SparkSession.builder.appName("LeituraCSV").getOrCreate()

# Definir o caminho para o arquivo CSV
caminho_csv = "/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_romance_pronto.csv"

# Ler o arquivo CSV usando o Spark
df_limpo = spark.read.csv(caminho_csv, header=True, inferSchema=True)

# Imprimir o DataFrame
df_limpo.show(5, truncate=False)

+---------+-------------------------+-------------+-----------------------+---------+-----------+
|id       |tituloPincipal           |anoLancamento|genero                 |notaMedia|numeroVotos|
+---------+-------------------------+-------------+-----------------------+---------+-----------+
|tt0000009|Miss Jerry               |1894         |Romance                |5.3      |200        |
|tt0001175|Camille                  |1912         |Drama,Romance          |5.3      |38         |
|tt0001475|Amor fatal               |1911         |Drama,Romance          |7.3      |21         |
|tt0003442|Tess of the D'Urbervilles|1913         |Drama,Romance          |5.9      |25         |
|tt0004207|The Last Egyptian        |1914         |Adventure,Drama,Romance|5.2      |22         |
+---------+-------------------------+-------------+-----------------------+---------+-----------+
only showing top 5 rows



### Salvando em Parquet

In [17]:
from pyspark.sql import SparkSession

# Iniciar uma sessão Spark
spark = SparkSession.builder \
    .appName("Leitura de arquivo Parquet") \
    .getOrCreate()

# Ler o arquivo Parquet para um DataFrame Spark
df_spark = spark.read.parquet("/content/drive/MyDrive/ProjetoBolsa/json/jsonTrusted/movies_romance_TMDB_json.parquet")

# Exibir o esquema do DataFrame
df_spark.printSchema()

# Exibir as primeiras linhas do DataFrame
df_spark.show()

# Parar a sessão Spark
spark.stop()


root
 |-- classificacao_media: double (nullable = true)
 |-- contagem_votos: long (nullable = true)
 |-- data_lancamento: string (nullable = true)
 |-- generos: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- id_imdb: string (nullable = true)
 |-- identificacao: long (nullable = true)
 |-- orcamento: long (nullable = true)
 |-- popularidade: double (nullable = true)
 |-- receita: long (nullable = true)

+-------------------+--------------+---------------+--------------------+---------+-------------+---------+------------------+-------+
|classificacao_media|contagem_votos|data_lancamento|             generos|  id_imdb|identificacao|orcamento|      popularidade|receita|
+-------------------+--------------+---------------+--------------------+---------+-------------+---------+------------------+-------+
|                5.0|             3|     1894-10-08|  [[10749, Romance]]|tt0000009|       356151|        0|

In [None]:
#usando spark fica varios arquivos uma merda
# Iniciar a sessão Spark
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Ler um arquivo CSV para criar um DataFrame Spark
df = spark.read.csv("/content/drive/MyDrive/ProjetoBolsa/json/jsonPreTrusted/todos_filmes_romance_tmdb.json", header=True, inferSchema=True)

# Escrever o DataFrame como um arquivo Parquet
df.write.parquet("/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrusted/movies_romance_limpo.parquet")

In [None]:
#usando spark fica varios arquivos uma merda
# Iniciar a sessão Spark
spark = SparkSession.builder.master("local[*]").getOrCreate()

# Ler um arquivo CSV para criar um DataFrame Spark
df = spark.read.json("/content/drive/MyDrive/ProjetoBolsa/json/jsonPreTrusted/todos_filmes_romance_tmdb.json")

# Escrever o DataFrame como um arquivo Parquet
df.write.parquet("/content/drive/MyDrive/ProjetoBolsa/json/jsonTrusted/filmes_romance_tmdb.json")

In [None]:
#usando pandas o mesmo fica numa taleba só
def csv_to_parquet(csv,parquet):
  df = pd.read_json(csv)
  df.to_parquet(parquet, index=False)
  print("Arquivo convertido com sucesso")

csv_arq="/content/drive/MyDrive/ProjetoBolsa/json/jsonPreTrusted/todos_filmes_romance_tmdb.json"
parquet_dest="/content/drive/MyDrive/ProjetoBolsa/json/jsonTrusted/filmes_romance_tmdb.json"

if __name__ == "__main__":
  csv_to_parquet(csv_arq,parquet_dest)



#### Excluindo coluna json-parquet e salvando como parquet

In [19]:
# Iniciar uma sessão Spark
spark = SparkSession.builder \
    .appName("Limpeza de colunas") \
    .getOrCreate()

# Ler o arquivo Parquet para um DataFrame Spark
df_spark = spark.read.parquet("/content/drive/MyDrive/ProjetoBolsa/json/jsonTrusted/movies_romance_TMDB_json.parquet")

# Limpar colunas (substitua 'coluna_indesejada' pelo nome da coluna que você deseja remover)
colunas_para_manter = [col for col in df_spark.columns if col != 'coluna_indesejada']
df_limpo = df_spark.select(*colunas_para_manter)

# Converter para um DataFrame Pandas
df_pandas = df_limpo.toPandas()

# Salvar o DataFrame Pandas como um arquivo Parquet
df_pandas.to_parquet("/content/drive/MyDrive/ProjetoBolsa/json/jsonRefined/movies_romance_TMDB_json.parquet")

# Parar a sessão Spark
spark.stop()



In [None]:
# Check for trailing characters in the JSON file
with open(csv_arq) as file:
    data = file.read()

if data.endswith('\n'):
    print("The file has a trailing newline character.")
    # Remove the trailing newline character
    with open(csv_arq) as file:
        data = file.read()
        file.seek(0)
        file.truncate()
        file.write(data.rstrip())
else:
    print("The file does not have a trailing newline character.")

# Convert the JSON file to Parquet format
def csv_to_parquet(json, parquet):
    df = pd.read_json(json)
    df.to_parquet(parquet, index=False)
    print("Arquivo convertido com sucesso")

csv_to_parquet(csv_arq, parquet_dest)

## Json - TMDB

In [None]:
# prompt: crir um codigo em pyspark para ler os dados de um csv numa pasta do google colab

# Criar uma SparkSession
spark = SparkSession.builder.appName("LeituraCSV").getOrCreate()

# Definir o caminho para o arquivo CSV
caminho_csv = "/content/drive/MyDrive/ProjetoBolsa/json/jsonPreTrusted/filmes_romance_tmad_tr_limpo.json"

# Ler o arquivo CSV usando o Spark
df = spark.read.json(caminho_csv)

# Imprimir o DataFrame
df.show(5, truncate=False)

+-------------------+--------------+---------------+--------------------------------------------------+---------+-------------+---------+------------+-------+
|classificacao_media|contagem_votos|data_lancamento|generos                                           |id_imdb  |identificacao|orcamento|popularidade|receita|
+-------------------+--------------+---------------+--------------------------------------------------+---------+-------------+---------+------------+-------+
|5.0                |3             |1894-10-08     |[[10749, Romance]]                                |tt0000009|356151       |0        |1.611       |0      |
|1.0                |1             |1912-01-02     |[[18, Drama]]                                     |tt0001175|282871       |0        |1.469       |0      |
|1.0                |1             |1913-09-01     |[[10749, Romance], [18, Drama]]                   |tt0003442|290616       |0        |0.745       |0      |
|4.0                |1             |1914-11-15

In [None]:
#API TMDB
api_key = "6ac09baadc310ae1df51dfe6d4f6e4ea"
#URL base do TMDB
base_url = "https://api.themoviedb.org/3/"

### Criação JSON dos dados limpos

In [None]:
def buscar_filme_por_id(api_key, movie_id):
    url = f"https://api.themoviedb.org/3/movie/{movie_id}?api_key={api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        return None

def salvar_json(data, filename):
    with open(filename, 'w') as f:
        json.dump(data, f, indent=4)

def processar_batch(batch_ids, api_key):
    batch_films = []
    for movie_id in batch_ids:
        film = buscar_filme_por_id(api_key, movie_id)
        if film:
            batch_films.append(film)
        else:
            print(f"Filme {movie_id} não encontrado.")
    return batch_films

def main():
    spark = SparkSession.builder \
        .appName("Processamento de Filmes") \
        .getOrCreate()

    api_key = "6ac09baadc310ae1df51dfe6d4f6e4ea"

    # Caminho para o arquivo CSV
    csv_filename = '/content/drive/MyDrive/ProjetoBolsa/dados/dadosRefined/dados_id_limpo.csv'

    # Lendo os IDs da coluna "id" do arquivo CSV usando PySpark DataFrame
    df = spark.read.option("header", "true").csv(csv_filename)
    movie_ids = [row['id'] for row in df.select("id").collect()]
    batch_size = 100
    num_batches = math.ceil(len(movie_ids) / batch_size)
    output_folder = Path('/content/drive/MyDrive/ProjetoBolsa/dados/dadosRaw/')  # Pasta de saída dos arquivos JSON

    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = min((i + 1) * batch_size, len(movie_ids))
        batch_ids = movie_ids[start_idx:end_idx]

        batch_rdd = spark.sparkContext.parallelize(batch_ids)
        batch_films_rdd = batch_rdd.map(lambda movie_id: buscar_filme_por_id(api_key, movie_id))
        batch_films = batch_films_rdd.collect()

        if batch_films:
            batch_filename = output_folder / f"dados_filmes_romance_limpos_{i+1}.json"
            salvar_json(batch_films, batch_filename)
            print(f"Detalhes dos filmes {start_idx+1}-{end_idx} foram salvos em '{batch_filename}'.")

    spark.stop()

if __name__ == "__main__":
    main()


### Json Linha Unica

In [None]:
def buscar_filme_por_id(api_key, movie_id):
    url = f"https://api.themoviedb.org/3/movie/{movie_id}?api_key={api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        return None

def salvar_json(data, filename):
    with open(filename, 'a') as f:
        for item in data:
            json.dump(item, f)
            f.write('\n')

def processar_batch(batch_ids, api_key):
    batch_films = []
    for movie_id in batch_ids:
        film = buscar_filme_por_id(api_key, movie_id)
        if film:
            batch_films.append(film)
        else:
            print(f"Filme {movie_id} não encontrado.")
    return batch_films

def main():
    spark = SparkSession.builder \
        .appName("Processamento de Filmes") \
        .getOrCreate()

    api_key = "6ac09baadc310ae1df51dfe6d4f6e4ea"

    # Caminho para o arquivo CSV
    csv_filename = '/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_dados_id_limpo_romance.csv'

    # Lendo os IDs da coluna "id" do arquivo CSV usando PySpark DataFrame
    df = spark.read.option("header", "true").csv(csv_filename)
    movie_ids = [row['id'] for row in df.select("id").collect()]
    batch_size = 100
    num_batches = math.ceil(len(movie_ids) / batch_size)
    output_folder = Path('/content/drive/MyDrive/ProjetoBolsa/json/jsonRaw')  # Pasta de saída dos arquivos JSON

    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = min((i + 1) * batch_size, len(movie_ids))
        batch_ids = movie_ids[start_idx:end_idx]

        batch_rdd = spark.sparkContext.parallelize(batch_ids)
        batch_films_rdd = batch_rdd.map(lambda movie_id: buscar_filme_por_id(api_key, movie_id))
        batch_films = batch_films_rdd.collect()

        if batch_films:
            batch_filename = output_folder / f"dados_filmes_romance_limpos_{i+1}.json"
            salvar_json(batch_films, batch_filename)
            print(f"Detalhes dos filmes {start_idx+1}-{end_idx} foram salvos em '{batch_filename}'.")

    spark.stop()

if __name__ == "__main__":
    main()


In [None]:
from pyspark.sql import SparkSession
import pandas as pd
import glob

def limpar_json(spark, input_folder, output_folder, keys_to_keep):
    # Listar todos os arquivos JSON na pasta de entrada
    input_files = glob.glob(input_folder + "/*.json")

    for json_filename in input_files:
        # Ler o arquivo JSON
        df = spark.read.json(json_filename)

        # Selecionar apenas as colunas desejadas
        cleaned_df = df.select(*keys_to_keep)

        # Converter o DataFrame do Spark para um DataFrame do Pandas
        cleaned_df_pandas = cleaned_df.toPandas()

        # Salvar o DataFrame limpo em um novo arquivo JSON usando o Pandas
        output_filename = json_filename.split("/")[-1].split(".")[0] + ".json"
        output_path = f"{output_folder}/{output_filename}"
        cleaned_df_pandas.to_json(output_path, orient='records', lines=True)
        print(f"Arquivo limpo salvo em: {output_path}")

# Exemplo de utilização
spark = SparkSession.builder \
    .appName("Limpeza de JSON") \
    .getOrCreate()

input_folder = '/content/drive/MyDrive/ProjetoBolsa/json/jsonTrusted'  # Pasta de entrada contendo os arquivos JSON
output_folder = '/content/drive/MyDrive/ProjetoBolsa/json/jsonPreTrusted'  # Pasta de saída para os arquivos limpos
keys_to_keep = ["budget", "genres", "id", "imdb_id", "popularity", "release_date", "revenue", "vote_average", "vote_count"]  # Chaves a serem mantidas

limpar_json(spark, input_folder, output_folder, keys_to_keep)

spark.stop()


Arquivo limpo salvo em: /content/drive/MyDrive/ProjetoBolsa/json/jsonPreTrusted/todos_filmes_romance_tmdb.json


### Dados do tmdb todos juntos

In [None]:
def buscar_filme_por_id(api_key, movie_id):
    url = f"https://api.themoviedb.org/3/movie/{movie_id}?api_key={api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        return None

def salvar_json(data, filename):
    with open(filename, 'w') as f:
        json.dump(data, f, indent=4)

def processar_batch(batch_ids, api_key):
    batch_films = []
    for movie_id in batch_ids:
        film = buscar_filme_por_id(api_key, movie_id)
        if film:
            batch_films.append(film)
        else:
            print(f"Filme {movie_id} não encontrado.")
    return batch_films

def main():
    spark = SparkSession.builder \
        .appName("Processamento de Filmes") \
        .getOrCreate()
    # Caminho para o arquivo CSV
    csv_filename = '/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/dados_id_limpo.csv'

    # Lendo os IDs da coluna "id" do arquivo CSV usando PySpark DataFrame
    df = spark.read.option("header", "true").csv(csv_filename)
    movie_ids = [row['id'] for row in df.select("id").collect()]
    num_batches = math.ceil(len(movie_ids) / batch_size)
    output_folder = Path('/content/drive/MyDrive/ProjetoBolsa/json/jsonRaw/aqui_dados/')  # Pasta de saída dos arquivos JSON

    all_films = []

    for i in range(num_batches):
        start_idx = i * batch_size
        end_idx = min((i + 1) * batch_size, len(movie_ids))
        batch_ids = movie_ids[start_idx:end_idx]

        batch_rdd = spark.sparkContext.parallelize(batch_ids)
        batch_films_rdd = batch_rdd.map(lambda movie_id: buscar_filme_por_id(api_key, movie_id))
        batch_films = batch_films_rdd.collect()

        if batch_films:
            all_films.extend(batch_films)

    output_filename = output_folder / "todos_filmes_romance_limpos.csv"
    salvar_json(all_films, output_filename)
    print(f"Todos os detalhes dos filmes foram salvos em '{output_filename}'.")

    spark.stop()

if __name__ == "__main__":
    main()


### Juntando todos os arquivos da pasta

In [None]:
def buscar_filme_por_id(api_key, movie_id):
    url = f"https://api.themoviedb.org/3/movie/{movie_id}?api_key={api_key}"
    response = requests.get(url)
    if response.status_code == 200:
        return response.json()
    else:
        return None

def salvar_json(data, filename):
    with open(filename, 'w') as f:
        for filme in data:
            json.dump(filme, f)
            f.write('\n')

def main():
    spark = SparkSession.builder \
        .appName("Processamento de Filmes") \
        .getOrCreate()
    # Caminho para o arquivo CSV
    csv_filename = '/content/drive/MyDrive/ProjetoBolsa/dados/dadosTrustedCSV/movies_romance_pronto.csv'

    # Lendo os IDs da coluna "id" do arquivo CSV usando PySpark DataFrame
    df = spark.read.option("header", "true").csv(csv_filename)
    movie_ids = [row['id'] for row in df.select("id").collect()]
    output_folder = Path('/content/drive/MyDrive/ProjetoBolsa/json/jsonTrusted/')  # Pasta de saída do arquivo JSON

    all_films = []

    for movie_id in movie_ids:
        film = buscar_filme_por_id(api_key, movie_id)
        if film:
            all_films.append(film)
        else:
            print(f"Filme {movie_id} não encontrado.")

    output_filename = output_folder / "todos_filmes_romance_tmdb.json"
    salvar_json(all_films, output_filename)
    print(f"Todos os detalhes dos filmes foram salvos em '{output_filename}'.")

    spark.stop()

if __name__ == "__main__":
    main()


### Convertendo os atributos

In [None]:
import pandas as pd

def renomear_atributos(spark, arquivo_entrada, arquivo_saida):
    # Lendo o arquivo JSON como DataFrame Spark
    df = spark.read.json(arquivo_entrada)

    # Mapeamento dos nomes antigos para os novos nomes dos atributos
    mapeamento = {
        "budget": "orcamento",
        "genres": "generos",
        "id": "identificacao",
        "imdb_id": "id_imdb",
        "popularity": "popularidade",
        "release_date": "data_lancamento",
        "revenue": "receita",
        "vote_average": "classificacao_media",
        "vote_count": "contagem_votos"
    }

    # Renomeando os atributos
    for antigo_nome, novo_nome in mapeamento.items():
        if antigo_nome in df.columns:
            df = df.withColumnRenamed(antigo_nome, novo_nome)

    # Convertendo DataFrame Spark para DataFrame pandas
    df_pandas = df.toPandas()

    # Salvando o DataFrame pandas como arquivo JSON
    df_pandas.to_json(arquivo_saida, orient='records')

# Criando uma sessão Spark
spark = SparkSession.builder \
    .appName("Renomear Atributos") \
    .getOrCreate()

# Exemplo de uso
arquivo_entrada = "/content/drive/MyDrive/ProjetoBolsa/json/jsonPreTrusted/todos_filmes_romance_tmdb.json"
arquivo_saida = "/content/drive/MyDrive/ProjetoBolsa/json/jsonPreTrusted/filmes_romance_tmdb_traducao.json"

renomear_atributos(spark, arquivo_entrada, arquivo_saida)

# Parando a sessão Spark
spark.stop()



### Limpando NA

In [None]:
import pandas as pd

def limpar_dados_nulos(spark, arquivo_entrada, arquivo_saida):
    # Lendo o arquivo JSON como DataFrame Spark
    df = spark.read.json(arquivo_entrada)

    # Convertendo DataFrame Spark para DataFrame pandas
    df_pandas = df.toPandas()

    # Removendo linhas com valores nulos em qualquer coluna
    df_limpo = df_pandas.dropna()

    # Salvando o DataFrame limpo como arquivo JSON
    df_limpo.to_json(arquivo_saida, orient='records')

# Criando uma sessão Spark
spark = SparkSession.builder \
    .appName("Limpar Dados Nulos") \
    .getOrCreate()

# Exemplo de uso
arquivo_entrada = "/content/drive/MyDrive/ProjetoBolsa/json/jsonPreTrusted/todos_filmes_romance_tmdb.json"
arquivo_saida = "/content/drive/MyDrive/ProjetoBolsa/json/jsonPreTrusted/filmes_romance_tmad_limpo.json"

limpar_dados_nulos(spark, arquivo_entrada, arquivo_saida)

# Parando a sessão Spark
spark.stop()


### Salvando json como parquet

In [None]:
import pandas as pd

def converter_json_para_parquet(arquivo_entrada, arquivo_saida):
    # Lendo o arquivo JSON como DataFrame pandas
    df = pd.read_json(arquivo_entrada)

    # Salvando o DataFrame como arquivo Parquet
    df.to_parquet(arquivo_saida)

# Exemplo de uso
arquivo_entrada_json = "/content/drive/MyDrive/ProjetoBolsa/json/jsonPreTrusted/filmes_romance_tmad_tr_limpo.json"
arquivo_saida_parquet = "/content/drive/MyDrive/ProjetoBolsa/json/jsonTrusted/movies_romance_TMDB_json.parquet"

converter_json_para_parquet(arquivo_entrada_json, arquivo_saida_parquet)


In [16]:
# Inicialize uma sessão Spark
spark = SparkSession.builder \
    .appName("Consulta CSV com PySpark") \
    .getOrCreate()

# Caminho do arquivo CSV
caminho_arquivo = "/content/drive/MyDrive/ProjetoBolsa/json/jsonPreTrusted/filmes_romance_tmad_tr_limpo.json"

# Carregue o arquivo CSV em um DataFrame Spark
df = spark.read.json(caminho_arquivo)

# Exiba o esquema do DataFrame
df.printSchema()
df.createOrReplaceTempView("dados_csv")
consulta_sql = "SELECT id_imdb FROM dados_csv"
resultado = spark.sql(consulta_sql)
resultado.show()

# Encerre a sessão Spark
spark.stop()


root
 |-- classificacao_media: double (nullable = true)
 |-- contagem_votos: long (nullable = true)
 |-- data_lancamento: string (nullable = true)
 |-- generos: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: string (containsNull = true)
 |-- id_imdb: string (nullable = true)
 |-- identificacao: long (nullable = true)
 |-- orcamento: long (nullable = true)
 |-- popularidade: double (nullable = true)
 |-- receita: long (nullable = true)

+---------+
|  id_imdb|
+---------+
|tt0000009|
|tt0001175|
|tt0003442|
|tt0004545|
|tt0004825|
|tt0004838|
|tt0005059|
|tt0005179|
|tt0005196|
|tt0005214|
|tt0005393|
|tt0005553|
|tt0005592|
|tt0005604|
|tt0005802|
|tt0006140|
|tt0006417|
|tt0006456|
|tt0006481|
|tt0006614|
+---------+
only showing top 20 rows



## Inicializando o S3

In [10]:
AWS_ACCESS_KEY_ID=""  # Substitua '--' pelo ID da chave de acesso da AWS
AWS_SECRET_ACCESS_KEY=""  # Substitua '--' pela chave secreta de acesso da AWS
AWS_SESSION_TOKEN=""  # Substitua '--' pelo token de sessão da AWS, se necessário
AWS_REGION='us-east-1'

# iniciar sessão
s3_client = boto3.client(
    's3',
    aws_access_key_id=AWS_ACCESS_KEY_ID,
    aws_secret_access_key=AWS_SECRET_ACCESS_KEY,
    aws_session_token=AWS_SESSION_TOKEN,
    region_name=AWS_REGION
)

### Enviando para o S3

##### Camada Raw

In [None]:
# Configurações
BUCKET_NAME = ''
LOCAL_CSV_FOLDER = ''

for csv_file in ['']:  # Substitua pelos nomes dos arquivos CSV na sua pasta local

    s3_key = f"Raw/{csv_file.split('.')[0]}/{datetime.now().strftime('%Y')}/{datetime.now().strftime('%m')}/{datetime.now().strftime('%d')}/{csv_file}"

    s3_client.upload_file(f"{LOCAL_CSV_FOLDER}/{csv_file}", BUCKET_NAME, s3_key)

    print(f"Arquivo {csv_file} enviado para {s3_key} no S3.")


In [None]:
BUCKET_NAME = ''
LOCAL_CSV_FOLDER = ''

# Lista todos os arquivos CSV na pasta LOCAL_CSV_FOLDER
csv_files = glob.glob(f"{LOCAL_CSV_FOLDER}/*.json")

# Itera sobre os arquivos CSV
for csv_file in csv_files:
    # Extrai o nome do arquivo sem a extensão
    csv_file_name = csv_file.split('/')[-1].split('.')[0]

    # Constrói a chave S3
    s3_key = f"Raw/{csv_file.split('.')[0]}/{datetime.datetime.now().strftime('%Y-%m-%d')}/{csv_file}"

    # Faz o upload do arquivo para o Amazon S3
    s3_client.upload_file(csv_file, BUCKET_NAME, s3_key)

    # Imprime uma mensagem indicando o envio bem-sucedido
    print(f"Arquivo {csv_file} enviado para {s3_key} no S3.")


#### Camada Trusted

In [12]:
# Configurações
BUCKET_NAME = ''
LOCAL_CSV_FOLDER = ''

for csv_file in ['movies_limpo.parquet','movies_romance_limpo.parquet']:  # Substitua pelos nomes dos arquivos CSV na sua pasta local

    s3_key = f"Trusted/{csv_file.split('.')[0]}/{datetime.datetime.now().strftime('%Y-%m-%d')}/{csv_file}"

    s3_client.upload_file(LOCAL_CSV_FOLDER + '/' + csv_file, BUCKET_NAME, s3_key)

    print(f"Arquivo {csv_file} enviado para {s3_key} no S3.")

Arquivo movies_limpo.parquet enviado para Trusted/movies_limpo/2024-03-09/movies_limpo.parquet no S3.
Arquivo movies_romance_limpo.parquet enviado para Trusted/movies_romance_limpo/2024-03-09/movies_romance_limpo.parquet no S3.


#### Camada  Refined

In [22]:
# Configurações
BUCKET_NAME = ''
LOCAL_CSV_FOLDER = ''

for csv_file in ['movies_romance_limpo.parquet','movies_limpo.parquet']:  # Substitua pelos nomes dos arquivos CSV na sua pasta local

    s3_key = f"Refined/{csv_file.split('.')[0]}/{datetime.datetime.now().strftime('%Y-%m-%d')}/{csv_file}"

    s3_client.upload_file(LOCAL_CSV_FOLDER + '/' + csv_file, BUCKET_NAME, s3_key)

    print(f"Arquivo {csv_file} enviado para {s3_key} no S3.")

Arquivo movies_romance_limpo.parquet enviado para Refined/movies_romance_limpo/2024-03-09/movies_romance_limpo.parquet no S3.
Arquivo movies_limpo.parquet enviado para Refined/movies_limpo/2024-03-09/movies_limpo.parquet no S3.
