In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, when, regexp_replace, from_unixtime
from pyspark.sql.functions import to_date, col, date_format,current_timestamp

spark = SparkSession.builder.appName("MeuAppSpark").getOrCreate()
df_occupations = spark.read.csv('/FileStore/tables/occupation.csv', inferSchema = True)
df_movies = spark.read.csv('/FileStore/tables/movies.csv', inferSchema = True)
df_ratings = spark.read.csv('/FileStore/tables/ratings.csv', inferSchema = True,sep=";")
df_users = spark.read.csv('/FileStore/tables/users.csv', header=False)

In [0]:
#Retirando informações distintas MOCK encontradas no dataset
from pyspark.sql.functions import col
df_users = df_users.filter(~(col("_c0").contains("MOCK") | col("_c0").contains("mock")))

df_ratings.show() #Não possui nome da coluna
df_movies.show() #Não possui nome da coluna
df_occupations.show()
df_users.show()


+------------------+
|               _c0|
+------------------+
|1;1193;5;978300760|
| 1;661;3;978302109|
| 1;914;3;978301968|
|1;3408;4;978300275|
|1;2355;5;978824291|
|1;1197;3;978302268|
|1;1287;5;978302039|
|1;2804;5;978300719|
| 1;594;4;978302268|
| 1;919;4;978301368|
| 1;595;5;978824268|
| 1;938;4;978301752|
|1;2398;4;978302281|
|1;2918;4;978302124|
|1;1035;5;978301753|
|1;2791;4;978302188|
|1;2687;3;978824268|
|1;2018;4;978301777|
|1;3105;5;978301713|
|1;2797;4;978302039|
+------------------+
only showing top 20 rows

+--------------------+
|                 _c0|
+--------------------+
|1;Toy Story (1995...|
|2;Jumanji (1995);...|
|3;Grumpier Old Me...|
|4;Waiting to Exha...|
|5;Father of the B...|
|6;Heat (1995);Act...|
|7;Sabrina (1995);...|
|8;Tom and Huck (1...|
|9;Sudden Death (1...|
|10;GoldenEye (199...|
|11;American Presi...|
|12;Dracula: Dead ...|
|13;Balto (1995);A...|
|14;Nixon (1995);D...|
|15;Cutthroat Isla...|
|16;Casino (1995);...|
|17;Sense and Sens...|
|18;Four R

In [0]:
# Estruturando as colunas e as selecionando para aprecer nas tabelas

from pyspark.sql.functions import col
df_users = df_users.filter(~(col("_c0").contains("MOCK") | col("_c0").contains("mock")))
df_users = df_users \
    .withColumn("id_usuario", split(col("_c0"), ";")[0]) \
    .withColumn("Sexo", split(col("_c0"), ";")[1]) \
    .withColumn("Idade", split(col("_c0"), ";")[2]) \
    .withColumn("ocupacao_id", split(col("_c0"), ";")[3]) \
    .withColumn("Codigo Postal", split(col("_c0"), ";")[4])

colunas_desejadas = ["id_usuario","Sexo","Idade","ocupacao_id","Codigo Postal"]

df_users = df_users.select(colunas_desejadas)
df_users.show()

df_ratings = df_ratings \
    .withColumn("usuario_id",split(col("_c0"),";")[0])\
    .withColumn("movie_id", split(col("_c0"),";")[1])\
    .withColumn("Classificacao",split(col("_c0"),";")[2])\
    .withColumn("Data/hora",split(col("_c0"),";")[3])

colunas_desejRatings = ["usuario_id","movie_id","Classificacao","Data/hora"] 
df_ratings = df_ratings.select(colunas_desejRatings)
df_ratings.show()

df_movies = df_movies\
    .withColumn("id_movie",split(col("_c0"),";")[0])\
    .withColumn("Titulo",split(col("_c0"),";")[1])\
    .withColumn("Generos",split(col("_c0"),";")[2])

colunas_desMovie = ["id_movie","Titulo","Generos"]
df_movies = df_movies.select(colunas_desMovie)
df_movies.show()

df_occupations = df_occupations\
    .withColumn("id_ocu",split(col("_c0"),";")[0])\
    .withColumn("NomeOcupacao",split(col("_c0"),";")[1])\
    
colunas_desOcup = ["id_ocu","NomeOcupacao"]
df_occupations = df_occupations.select(colunas_desOcup)
df_movies.show()

+----------+----+-----+-----------+-------------+
|id_usuario|Sexo|Idade|ocupacao_id|Codigo Postal|
+----------+----+-----+-----------+-------------+
|         1|   F|    1|         10|        48067|
|         2|   M|   56|         16|        70072|
|         3|   M|   25|         15|        55117|
|         4|   M|   45|          7|        02460|
|         5|   M|   25|         20|        55455|
|         6|   F|   50|          9|        55117|
|         7|   M|   35|          1|        06810|
|         8|   M|   25|         12|        11413|
|         9|   M|   25|         17|        61614|
|        10|   F|   35|          1|        95370|
|        11|   F|   25|          1|        04093|
|        12|   M|   25|         12|        32793|
|        13|   M|   45|          1|        93304|
|        14|   M|   35|          0|        60126|
|        15|   M|   25|          7|        22903|
|          |   F|   35|          0|        20670|
|        17|   M|   50|          1|        95350|


In [0]:
#Alterações propostas no desafio
df_movies = df_movies.withColumn('id_movie', regexp_replace(col('id_movie'), '"', '')) #Replace valores "" 
df_users = df_users.withColumn("Sexo", when(col("Sexo") == "M","Masculino").when(col("Sexo")== "F","Feminino").otherwise(col("Sexo"))) #Alterando a coluna SEXO para nomes Feminino e Masculino
df_users.show()

+----------+---------+-----+-----------+-------------+
|id_usuario|     Sexo|Idade|ocupacao_id|Codigo Postal|
+----------+---------+-----+-----------+-------------+
|         1| Feminino|    1|         10|        48067|
|         2|Masculino|   56|         16|        70072|
|         3|Masculino|   25|         15|        55117|
|         4|Masculino|   45|          7|        02460|
|         5|Masculino|   25|         20|        55455|
|         6| Feminino|   50|          9|        55117|
|         7|Masculino|   35|          1|        06810|
|         8|Masculino|   25|         12|        11413|
|         9|Masculino|   25|         17|        61614|
|        10| Feminino|   35|          1|        95370|
|        11| Feminino|   25|          1|        04093|
|        12|Masculino|   25|         12|        32793|
|        13|Masculino|   45|          1|        93304|
|        14|Masculino|   35|          0|        60126|
|        15|Masculino|   25|          7|        22903|
|         

In [0]:
#Junção
# Realize a primeira junção entre df_users, df_occupations,df_ratings and df_movie
df_table = df_users.join(df_occupations, df_users.ocupacao_id == df_occupations.id_ocu, 'inner')
df_table = df_table.join(df_ratings, df_table.id_usuario == df_ratings.usuario_id, 'inner')
df_table = df_table.join(df_movies, df_movies.id_movie == df_table.movie_id,'inner')

#Limpando os dados de acordo com os requisitos da coluna classificação
df_table = df_table.filter(col("CLASSIFICACAO").isNotNull())

#Selecionando as colunas
columnsSelect= ['id_usuario','Sexo','Idade','Codigo Postal','id_ocu','NomeOcupacao','Classificacao','id_movie','Titulo','Generos','Data/hora',]
df_table = df_table.select(columnsSelect)
df_table.show()

+----------+--------+-----+-------------+------+------------+-------------+--------+--------------------+--------------------+---------+
|id_usuario|    Sexo|Idade|Codigo Postal|id_ocu|NomeOcupacao|Classificacao|id_movie|              Titulo|             Generos|Data/hora|
+----------+--------+-----+-------------+------+------------+-------------+--------+--------------------+--------------------+---------+
|         1|Feminino|    1|        48067|    10|K-12 student|            5|    1193|One Flew Over the...|               Drama|978300760|
|         1|Feminino|    1|        48067|    10|K-12 student|            3|     661|James and the Gia...|Animation|Childre...|978302109|
|         1|Feminino|    1|        48067|    10|K-12 student|            3|     914| My Fair Lady (1964)|     Musical|Romance|978301968|
|         1|Feminino|    1|        48067|    10|K-12 student|            4|    3408|Erin Brockovich (...|               Drama|978300275|
|         1|Feminino|    1|        48067|

In [0]:
# - A informação de data de notas de filmes deve estar em um formato legível (YYYY-MM-DD) ;
# - Um novo campo com a data da execução do processo deve ser gerado para garantir a execução histórica do processo quando o mesmo for para produção;  
# - Além disso, é necessário garantir que não haverá nenhum registro com a nota de filme nula, pois os mesmos não agregam valor à base;

df_estruturado = df_table.withColumn("data/hora", from_unixtime(col("Data/hora"))) # transformando a coluna de segundos para timestamp

df_estruturado = df_estruturado.withColumn("dataformatada", date_format(col("Data/hora"),"yyyy-MM-dd")) # Formatando a data para aparecer somente DATA

df_estruturado = df_estruturado.withColumn("hora_execucao", current_timestamp()) #inserindo um current_ para registrar a data e hora de execução de um processo 

df_estruturado = df_estruturado.na.drop(subset=["Classificacao"]) #Removendo valores nulos da coluna de ratings cornforme solicitado

# Exiba o DataFrame resultante
colunas_selecionadas = ['id_usuario','Sexo','Idade','Codigo Postal','id_ocu','NomeOcupacao','Classificacao','id_movie','Titulo','Generos','dataformatada','hora_execucao']
df_estruturado= df_estruturado.select(colunas_selecionadas)
df_estruturado.show()

+----------+--------+-----+-------------+------+------------+-------------+--------+--------------------+--------------------+-------------+--------------------+
|id_usuario|    Sexo|Idade|Codigo Postal|id_ocu|NomeOcupacao|Classificacao|id_movie|              Titulo|             Generos|dataformatada|       hora_execucao|
+----------+--------+-----+-------------+------+------------+-------------+--------+--------------------+--------------------+-------------+--------------------+
|         1|Feminino|    1|        48067|    10|K-12 student|            5|    1193|One Flew Over the...|               Drama|   2000-12-31|2023-09-14 17:42:...|
|         1|Feminino|    1|        48067|    10|K-12 student|            3|     661|James and the Gia...|Animation|Childre...|   2000-12-31|2023-09-14 17:42:...|
|         1|Feminino|    1|        48067|    10|K-12 student|            3|     914| My Fair Lady (1964)|     Musical|Romance|   2000-12-31|2023-09-14 17:42:...|
|         1|Feminino|    1| 

In [0]:


#Fazendo o particionamento dos dados conforme o Genero
column_df_rg = ['Sexo','id_usuario','id_movie','Classificacao','dataformatada']
finally_table_R = df_estruturado.select(column_df_rg)

column_df_movie = ['Sexo','id_movie','Titulo','Generos']
finally_table_M = df_estruturado.select(column_df_movie)

column_df_user = ['Sexo','id_usuario','Idade','id_ocu','Codigo Postal']
finally_table_U = df_estruturado.select(column_df_user)

column_df_oc = ['Sexo','id_ocu','NomeOcupacao']
finally_table_O = df_estruturado.select(column_df_oc)

finally_table_R.write.csv("/Users/BlueShift/Desktop/DesafioAnaSpark/arquivo_R.csv", header=True,mode="overwrite")
finally_table_M.write.csv("/Users/BlueShift/Desktop/DesafioAnaSpark/arquivo_M.csv", header=True, mode="overwrite")
finally_table_U.write.csv("/Users/BlueShift/Desktop/DesafioAnaSpark/arquivo_U.csv", header=True, mode="overwrite")
finally_table_O.write.csv("/Users/BlueShift/Desktop/DesafioAnaSpark/arquivo_O.csv", header=True, mode="overwrite")



In [0]:
#Selecionando as colunas para o database final para as analises das informações
selectcolumns = ['Sexo','NomeOcupacao','Classificacao','Generos','dataformatada']
df_Analise =  df_estruturado.select(selectcolumns)
df_Analise.show()

+--------+------------+-------------+--------------------+-------------+
|    Sexo|NomeOcupacao|Classificacao|             Generos|dataformatada|
+--------+------------+-------------+--------------------+-------------+
|Feminino|K-12 student|            5|               Drama|   2000-12-31|
|Feminino|K-12 student|            3|Animation|Childre...|   2000-12-31|
|Feminino|K-12 student|            3|     Musical|Romance|   2000-12-31|
|Feminino|K-12 student|            4|               Drama|   2000-12-31|
|Feminino|K-12 student|            5|                    |   2001-01-06|
|Feminino|K-12 student|            3|                    |   2000-12-31|
|Feminino|K-12 student|            5|Action|Adventure|...|   2000-12-31|
|Feminino|K-12 student|            5|                    |   2000-12-31|
|Feminino|K-12 student|            4|Animation|Childre...|   2000-12-31|
|Feminino|K-12 student|            4|                    |   2000-12-31|
|Feminino|K-12 student|            5|Animation|Chil

In [0]:
# Registrar o DataFrame como uma tabela temporária
df_Analise.createOrReplaceTempView("tabela_temporaria")

# 1- Qual o total de registros contidos na nova base?
resultado = spark.sql("SELECT count('Classificacao') AS QTD_CLASSIFICACAO FROM tabela_temporaria")
resultado.show()


+-----------------+
|QTD_CLASSIFICACAO|
+-----------------+
|           999002|
+-----------------+



In [0]:
#2-Quantos filmes distintos foram avaliados por mulheres e quantos foram avaliados por homens?
Filmes = spark.sql("SELECT SEXO,COUNT('CLASSIFICACAO') AS QTD_AVALIACAO FROM TABELA_TEMPORARIA GROUP BY SEXO ORDER BY COUNT('CLASSIFICACAO') DESC")
Filmes.show()

+---------+-------------+
|     SEXO|QTD_AVALIACAO|
+---------+-------------+
|Masculino|       752827|
| Feminino|       246175|
+---------+-------------+



In [0]:
#3 -Quantos filmes de ação foram avaliados por mulheres?
filmes_acao = spark.sql("SELECT SEXO, COUNT('Classificacao'),Generos as media FROM TABELA_TEMPORARIA WHERE Generos LIKE '%Action%' AND SEXO = 'Feminino'"
                        "group by SEXO, GENEROS")
#filmes_acao.show()

filmesAvaliados = spark.sql("SELECT SEXO, COUNT('Classificacao') as QTD_AVALIACOES FROM TABELA_TEMPORARIA WHERE Generos LIKE '%Action%' AND SEXO = 'Feminino'"
                        "group by SEXO")
filmesAvaliados.show()

+--------+--------------+
|    SEXO|QTD_AVALIACOES|
+--------+--------------+
|Feminino|         34353|
+--------+--------------+



In [0]:
#4-Quantos filmes de romance foram avaliados por homens?
filmeRomance = spark.sql("SELECT SEXO, COUNT(CLASSIFICACAO) AS QTD_AVALIACAO FROM TABELA_TEMPORARIA WHERE SEXO = 'Masculino' GROUP BY SEXO")
filmeRomance.show()

+---------+-------------+
|     SEXO|QTD_AVALIACAO|
+---------+-------------+
|Masculino|       752827|
+---------+-------------+



In [0]:
#5-Qual é o top 10 filmes de Thriller mais bem avaliados por programadores em 12 de dezembro de 2000?
top10 = spark.sql("SELECT CLASSIFICACAO,GENEROS, NOMEOCUPACAO, dataformatada as DATA FROM TABELA_TEMPORARIA WHERE"
                   "(NOMEOCUPACAO='programmer'"
                  " AND GENEROS like '%Thriller%') AND dataformatada ='2000-12-12'"
                  "ORDER BY Classificacao desc  limit 10")
top10.show()

+-------------+--------------------+------------+----------+
|CLASSIFICACAO|             GENEROS|NOMEOCUPACAO|      DATA|
+-------------+--------------------+------------+----------+
|            5|Comedy|Romance|Th...|  programmer|2000-12-12|
|            5|Crime|Film-Noir|M...|  programmer|2000-12-12|
|            5|     Action|Thriller|  programmer|2000-12-12|
|            5|Action|Adventure|...|  programmer|2000-12-12|
|            5|     Action|Thriller|  programmer|2000-12-12|
|            5|Action|Romance|Th...|  programmer|2000-12-12|
|            5|Crime|Drama|Thriller|  programmer|2000-12-12|
|            4|Action|Sci-Fi|Thr...|  programmer|2000-12-12|
|            4|     Action|Thriller|  programmer|2000-12-12|
|            4|     Action|Thriller|  programmer|2000-12-12|
+-------------+--------------------+------------+----------+



In [0]:
# Brincando com os dados
# Quero saber a quantidade de classificações acima de 5, por generos, pelo sexo feminino com ocupação Writer
analisegeral = spark.sql(
    "SELECT SEXO,GENEROS, COUNT(Classificacao) AS Qtd_Classificacoes_Distintas,NOMEOCUPACAO " 
    "FROM TABELA_TEMPORARIA " 
    "WHERE (GENEROS LIKE 'Adventure%' AND GENEROS LIKE '%Comedy%') " 
    "AND SEXO IN (SELECT DISTINCT SEXO FROM TABELA_TEMPORARIA WHERE SEXO = 'Feminino')"
    "AND NOMEOCUPACAO = 'writer'" 
    "GROUP BY GENEROS,SEXO,NOMEOCUPACAO " +
    "HAVING COUNT(Classificacao) >5 "
    "ORDER BY 3 ASC"
)
analisegeral.show(analisegeral.count(),truncate=False)

+--------+---------------------------------------------+----------------------------+------------+
|SEXO    |GENEROS                                      |Qtd_Classificacoes_Distintas|NOMEOCUPACAO|
+--------+---------------------------------------------+----------------------------+------------+
|Feminino|Adventure|Children's|Comedy|Musical          |6                           |writer      |
|Feminino|Adventure|Animation|Children's|Comedy|Fantasy|7                           |writer      |
|Feminino|Adventure|Comedy|Musical                     |8                           |writer      |
|Feminino|Adventure|Animation|Children's|Comedy|Musical|9                           |writer      |
|Feminino|Adventure|Children's|Comedy                  |18                          |writer      |
|Feminino|Adventure|Comedy|Sci-Fi                      |27                          |writer      |
|Feminino|Adventure|Children's|Comedy|Fantasy          |27                          |writer      |
|Feminino|