In [0]:
from pyspark.sql.functions import split
from pyspark.sql.functions import lit, col, column, expr, udf, when
from pyspark.sql.functions import datediff, to_date
from pyspark.sql.types import *
from pyspark.sql.functions import *
from datetime import datetime
from functools import reduce
import pyspark.sql.functions as F
from pyspark.sql.functions import current_date
from pyspark.sql.types import IntegerType,BooleanType,DateType
from pyspark.sql.functions import from_unixtime, unix_timestamp
from pyspark.sql.functions import split


In [0]:
path_dataset = "/FileStore/tables/OriginaisNetflix.parquet"

In [0]:
file_type = "parquet"
infer_schema = "true"
first_row_is_header = "true"
delimiter = ";"

df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(path_dataset)\

df.show(5)
#display(df)

+--------------------+----------------+----------------+---------+--------------------+-------------+--------------+----------+---------+---------+-------+------+-----+--------+--------------------+
|               Title|           Genre|     GenreLabels| Premiere|             Seasons|SeasonsParsed|EpisodesParsed|    Length|MinLength|MaxLength| Status|Active|Table|Language|         dt_inclusao|
+--------------------+----------------+----------------+---------+--------------------+-------------+--------------+----------+---------+---------+-------+------+-----+--------+--------------------+
|      House of Cards| Political drama| political,drama| 1-Feb-13|6 seasons, 73 epi...|            6|            73|42–59 min.|       42|       59|  Ended|     0|Drama| English|2021-03-16T21:20:...|
|       Hemlock Grove| Horror/thriller| horror,thriller|19-Apr-13|3 seasons, 33 epi...|            3|            33|45–58 min.|       45|       58|  Ended|     0|Drama| English|2021-03-16T21:20:...|
|Oran

In [0]:
df.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- GenreLabels: string (nullable = true)
 |-- Premiere: string (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- SeasonsParsed: long (nullable = true)
 |-- EpisodesParsed: long (nullable = true)
 |-- Length: string (nullable = true)
 |-- MinLength: long (nullable = true)
 |-- MaxLength: long (nullable = true)
 |-- Status: string (nullable = true)
 |-- Active: long (nullable = true)
 |-- Table: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- dt_inclusao: string (nullable = true)



In [0]:
# 1. Transformar os campos "Premiere" e "dt_inclusao" de string para datetime.

df1 = df.withColumn("Premiere1_Dia", split(col("Premiere"), "-").getItem(0))\
.withColumn("Premiere2_Mes", split(col("Premiere"), "-").getItem(1))\
.withColumn("Premiere3_Ano", split(col("Premiere"), "-").getItem(2))

df_1 = df1.withColumn('Premiere2_Mes', 
                      when(df1.Premiere2_Mes.endswith('Jan'),regexp_replace(df1.Premiere2_Mes,'Jan','01')) \
                     .when(df1.Premiere2_Mes.endswith('Feb'),regexp_replace(df1.Premiere2_Mes,'Feb','02')) \
                     .when(df1.Premiere2_Mes.endswith('Mar'),regexp_replace(df1.Premiere2_Mes,'Mar','03')) \
                     .when(df1.Premiere2_Mes.endswith('Apr'),regexp_replace(df1.Premiere2_Mes,'Apr','04')) \
                     .when(df1.Premiere2_Mes.endswith('May'),regexp_replace(df1.Premiere2_Mes,'May','05')) \
                     .when(df1.Premiere2_Mes.endswith('Jun'),regexp_replace(df1.Premiere2_Mes,'Jun','06')) \
                     .when(df1.Premiere2_Mes.endswith('Jul'),regexp_replace(df1.Premiere2_Mes,'Jul','07')) \
                     .when(df1.Premiere2_Mes.endswith('Aug'),regexp_replace(df1.Premiere2_Mes,'Aug','08')) \
                     .when(df1.Premiere2_Mes.endswith('Sep'),regexp_replace(df1.Premiere2_Mes,'Sep','09')) \
                     .when(df1.Premiere2_Mes.endswith('Oct'),regexp_replace(df1.Premiere2_Mes,'Oct','10')) \
                     .when(df1.Premiere2_Mes.endswith('Nov'),regexp_replace(df1.Premiere2_Mes,'Nov','11')) \
                     .when(df1.Premiere2_Mes.endswith('Dec'),regexp_replace(df1.Premiere2_Mes,'Dec','12')).otherwise(df1.Premiere2_Mes))

df_11 = df_1.withColumn("FullPremiere",concat(col("Premiere1_Dia"),lit('-'),col("Premiere2_Mes"),lit('-'),col("Premiere3_Ano")))

#df_111 = df_11.withColumn("FullPremiere", F.to_timestamp('Date_Time', 'dd/MM/yyyy hh:mm:ss a'))

#df_111 = df_11.withColumn('FullPremiere', to_timestamp(current_timestamp(), "MM-dd-yyyy hh mm a"))


df_111 = df_11.withColumn("Premiere",col("Premiere").cast(DateType())) \
    .withColumn("dt_inclusao",col("dt_inclusao").cast(DateType())) \
    .withColumn("FullPremiere",col("FullPremiere").cast(DateType()))

#df_111 = df_11.withColumn("FullPremiere",to_timestamp(col("FullPremiere")))

df_111.show(2)
#display(df_111)


+--------------+---------------+---------------+--------+--------------------+-------------+--------------+----------+---------+---------+------+------+-----+--------+-----------+-------------+-------------+-------------+------------+
|         Title|          Genre|    GenreLabels|Premiere|             Seasons|SeasonsParsed|EpisodesParsed|    Length|MinLength|MaxLength|Status|Active|Table|Language|dt_inclusao|Premiere1_Dia|Premiere2_Mes|Premiere3_Ano|FullPremiere|
+--------------+---------------+---------------+--------+--------------------+-------------+--------------+----------+---------+---------+------+------+-----+--------+-----------+-------------+-------------+-------------+------------+
|House of Cards|Political drama|political,drama|    null|6 seasons, 73 epi...|            6|            73|42–59 min.|       42|       59| Ended|     0|Drama| English| 2021-03-16|            1|           02|           13|        null|
| Hemlock Grove|Horror/thriller|horror,thriller|    null|3 s

In [0]:
df_111.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- GenreLabels: string (nullable = true)
 |-- Premiere: date (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- SeasonsParsed: long (nullable = true)
 |-- EpisodesParsed: long (nullable = true)
 |-- Length: string (nullable = true)
 |-- MinLength: long (nullable = true)
 |-- MaxLength: long (nullable = true)
 |-- Status: string (nullable = true)
 |-- Active: long (nullable = true)
 |-- Table: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- dt_inclusao: date (nullable = true)
 |-- Premiere1_Dia: string (nullable = true)
 |-- Premiere2_Mes: string (nullable = true)
 |-- Premiere3_Ano: string (nullable = true)
 |-- FullPremiere: date (nullable = true)



In [0]:
# 2. Ordenar os dados por ativos e gênero de forma decrescente, 0 = inativo e 1 = ativo, todos com número 1 devem aparecer primeiro.

df2 = df_11.sort(col("Active").desc(),col("Genre").desc())
df2.show(2)
#display(df2)

+--------------------+--------------------+--------------------+---------+--------------------+-------------+--------------+------+---------+---------+-------+------+--------+--------+--------------------+-------------+-------------+-------------+------------+
|               Title|               Genre|         GenreLabels| Premiere|             Seasons|SeasonsParsed|EpisodesParsed|Length|MinLength|MaxLength| Status|Active|   Table|Language|         dt_inclusao|Premiere1_Dia|Premiere2_Mes|Premiere3_Ano|FullPremiere|
+--------------------+--------------------+--------------------+---------+--------------------+-------------+--------------+------+---------+---------+-------+------+--------+--------+--------------------+-------------+-------------+-------------+------------+
|Dance & Sing with...|chrildrens musica...|chrildrens,musica...|18-May-18|1 season, 11 epis...|            1|            11|2 min.|        2|        2|Pending|     1|Children| English|2021-03-16T21:20:...|           1

In [0]:
df2.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- GenreLabels: string (nullable = true)
 |-- Premiere: string (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- SeasonsParsed: long (nullable = true)
 |-- EpisodesParsed: long (nullable = true)
 |-- Length: string (nullable = true)
 |-- MinLength: long (nullable = true)
 |-- MaxLength: long (nullable = true)
 |-- Status: string (nullable = true)
 |-- Active: long (nullable = true)
 |-- Table: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- dt_inclusao: string (nullable = true)
 |-- Premiere1: string (nullable = true)
 |-- Premiere2: string (nullable = true)
 |-- Premiere3: string (nullable = true)
 |-- FullPremiere: string (nullable = true)



In [0]:
# 3. Remover linhas duplicadas e trocar o resultado das linhas que tiverem a coluna "Seasons" de "TBA" para "a ser anunciado".

df3 = df2.dropDuplicates()
print("Distinct Seasons: " + str(df3.count()))
order = df3.sort(col("Active").desc(),col("Genre").desc())
order.show(2)
#display(order)

Distinct Seasons: 358
+--------------------+--------------------+--------------------+---------+--------------------+-------------+--------------+------+---------+---------+-------+------+--------+--------+--------------------+-------------+-------------+-------------+------------+
|               Title|               Genre|         GenreLabels| Premiere|             Seasons|SeasonsParsed|EpisodesParsed|Length|MinLength|MaxLength| Status|Active|   Table|Language|         dt_inclusao|Premiere1_Dia|Premiere2_Mes|Premiere3_Ano|FullPremiere|
+--------------------+--------------------+--------------------+---------+--------------------+-------------+--------------+------+---------+---------+-------+------+--------+--------+--------------------+-------------+-------------+-------------+------------+
|Dance & Sing with...|chrildrens musica...|chrildrens,musica...|18-May-18|1 season, 11 epis...|            1|            11|2 min.|        2|        2|Pending|     1|Children| English|2021-03-16T

In [0]:
order.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- GenreLabels: string (nullable = true)
 |-- Premiere: string (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- SeasonsParsed: long (nullable = true)
 |-- EpisodesParsed: long (nullable = true)
 |-- Length: string (nullable = true)
 |-- MinLength: long (nullable = true)
 |-- MaxLength: long (nullable = true)
 |-- Status: string (nullable = true)
 |-- Active: long (nullable = true)
 |-- Table: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- dt_inclusao: string (nullable = true)
 |-- Premiere1_Dia: string (nullable = true)
 |-- Premiere2_Mes: string (nullable = true)
 |-- Premiere3_Ano: string (nullable = true)
 |-- FullPremiere: string (nullable = true)



In [0]:
order1 = order.withColumn("Seasons", when(order.Seasons == "TBA","a ser anunciado").otherwise(order.Seasons))
order1.show(2)
#display(order1)


+--------------------+--------------------+--------------------+---------+--------------------+-------------+--------------+------+---------+---------+-------+------+--------+--------+--------------------+-------------+-------------+-------------+------------+
|               Title|               Genre|         GenreLabels| Premiere|             Seasons|SeasonsParsed|EpisodesParsed|Length|MinLength|MaxLength| Status|Active|   Table|Language|         dt_inclusao|Premiere1_Dia|Premiere2_Mes|Premiere3_Ano|FullPremiere|
+--------------------+--------------------+--------------------+---------+--------------------+-------------+--------------+------+---------+---------+-------+------+--------+--------+--------------------+-------------+-------------+-------------+------------+
|Dance & Sing with...|chrildrens musica...|chrildrens,musica...|18-May-18|1 season, 11 epis...|            1|            11|2 min.|        2|        2|Pending|     1|Children| English|2021-03-16T21:20:...|           1

In [0]:
# Filtro para identificar volume modificado.

filtro = order1.filter(order1.Seasons == "a ser anunciado")
filtro.show(2)
#display(filtro)

+----------+--------------------+--------------------+---------+---------------+-------------+--------------+------+---------+---------+-------+------+----------+--------+--------------------+-------------+-------------+-------------+------------+
|     Title|               Genre|         GenreLabels| Premiere|        Seasons|SeasonsParsed|EpisodesParsed|Length|MinLength|MaxLength| Status|Active|     Table|Language|         dt_inclusao|Premiere1_Dia|Premiere2_Mes|Premiere3_Ano|FullPremiere|
+----------+--------------------+--------------------+---------+---------------+-------------+--------------+------+---------+---------+-------+------+----------+--------+--------------------+-------------+-------------+-------------+------------+
|True Tunes|chrildrens musica...|chrildrens,musica...|12-Jul-19|a ser anunciado|            0|             0|   TBA|        0|        0|Pending|     1|  Children| English|2021-03-16T21:20:...|           12|           07|           19|    12-07-19|
| Exhibi

In [0]:
order1.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- GenreLabels: string (nullable = true)
 |-- Premiere: string (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- SeasonsParsed: long (nullable = true)
 |-- EpisodesParsed: long (nullable = true)
 |-- Length: string (nullable = true)
 |-- MinLength: long (nullable = true)
 |-- MaxLength: long (nullable = true)
 |-- Status: string (nullable = true)
 |-- Active: long (nullable = true)
 |-- Table: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- dt_inclusao: string (nullable = true)
 |-- Premiere1_Dia: string (nullable = true)
 |-- Premiere2_Mes: string (nullable = true)
 |-- Premiere3_Ano: string (nullable = true)
 |-- FullPremiere: string (nullable = true)



In [0]:
# 4. Criar uma coluna nova chamada "Data de Alteração" e dentro dela um timestamp.

df4 = order1.withColumn("Data de Alteração", to_timestamp(current_timestamp(),"MM-dd-yyyy hh mm a"))\
      .withColumnRenamed("Data de Alteração","Data_de_Alteração")
#df4.show(1,truncate=False)
df4.show(2)
#display(df4)

+--------------------+--------------------+--------------------+---------+--------------------+-------------+--------------+------+---------+---------+-------+------+--------+--------+--------------------+-------------+-------------+-------------+------------+--------------------+
|               Title|               Genre|         GenreLabels| Premiere|             Seasons|SeasonsParsed|EpisodesParsed|Length|MinLength|MaxLength| Status|Active|   Table|Language|         dt_inclusao|Premiere1_Dia|Premiere2_Mes|Premiere3_Ano|FullPremiere|   Data_de_Alteração|
+--------------------+--------------------+--------------------+---------+--------------------+-------------+--------------+------+---------+---------+-------+------+--------+--------+--------------------+-------------+-------------+-------------+------------+--------------------+
|Dance & Sing with...|chrildrens musica...|chrildrens,musica...|18-May-18|1 season, 11 epis...|            1|            11|2 min.|        2|        2|Pen

In [0]:
df4.printSchema()

root
 |-- Title: string (nullable = true)
 |-- Genre: string (nullable = true)
 |-- GenreLabels: string (nullable = true)
 |-- Premiere: string (nullable = true)
 |-- Seasons: string (nullable = true)
 |-- SeasonsParsed: long (nullable = true)
 |-- EpisodesParsed: long (nullable = true)
 |-- Length: string (nullable = true)
 |-- MinLength: long (nullable = true)
 |-- MaxLength: long (nullable = true)
 |-- Status: string (nullable = true)
 |-- Active: long (nullable = true)
 |-- Table: string (nullable = true)
 |-- Language: string (nullable = true)
 |-- dt_inclusao: string (nullable = true)
 |-- Premiere1_Dia: string (nullable = true)
 |-- Premiere2_Mes: string (nullable = true)
 |-- Premiere3_Ano: string (nullable = true)
 |-- FullPremiere: string (nullable = true)
 |-- Data_de_Alteração: timestamp (nullable = true)



In [0]:
# 5. Trocar os nomes das colunas de inglês para português, exemplo: "Title" para "Título" (com acentuação).

oldColumns = df4.schema.names
newColumns = ["Título","Gênero","GenreLabels","Pré estreia","Temporadas","SeasonsParsed","Episódios analisados"
              ,"Comprimento","Comprimento mínimo","Comprimento máximo","Status","Ativo","Mesa","Língua","dt_inclusao"
              ,"Premiere1","Premiere2","Premiere3","FullPremiere","Data_de_Alteração"]
df11 = reduce(lambda df4, colunas: df4.withColumnRenamed(oldColumns[colunas], newColumns[colunas]), range(len(oldColumns)), df4)
df11.show(2)
#display(df11)



+--------------------+--------------------+--------------------+-----------+--------------------+-------------+--------------------+-----------+------------------+------------------+-------+-----+--------+-------+--------------------+---------+---------+---------+------------+--------------------+
|              Título|              Gênero|         GenreLabels|Pré estreia|          Temporadas|SeasonsParsed|Episódios analisados|Comprimento|Comprimento mínimo|Comprimento máximo| Status|Ativo|    Mesa| Língua|         dt_inclusao|Premiere1|Premiere2|Premiere3|FullPremiere|   Data_de_Alteração|
+--------------------+--------------------+--------------------+-----------+--------------------+-------------+--------------------+-----------+------------------+------------------+-------+-----+--------+-------+--------------------+---------+---------+---------+------------+--------------------+
|Dance & Sing with...|chrildrens musica...|chrildrens,musica...|  18-May-18|1 season, 11 epis...|      

In [0]:
df11.printSchema()

root
 |-- Título: string (nullable = true)
 |-- Gênero: string (nullable = true)
 |-- GenreLabels: string (nullable = true)
 |-- Pré estreia: string (nullable = true)
 |-- Temporadas: string (nullable = true)
 |-- SeasonsParsed: long (nullable = true)
 |-- Episódios analisados: long (nullable = true)
 |-- Comprimento: string (nullable = true)
 |-- Comprimento mínimo: long (nullable = true)
 |-- Comprimento máximo: long (nullable = true)
 |-- Status: string (nullable = true)
 |-- Ativo: long (nullable = true)
 |-- Mesa: string (nullable = true)
 |-- Língua: string (nullable = true)
 |-- dt_inclusao: string (nullable = true)
 |-- Premiere1: string (nullable = true)
 |-- Premiere2: string (nullable = true)
 |-- Premiere3: string (nullable = true)
 |-- FullPremiere: string (nullable = true)
 |-- Data_de_Alteração: timestamp (nullable = true)



In [0]:
# 6. Testar e verificar se existe algum erro de processamento do spark e identificar onde pode ter ocorrido o erro.



In [0]:
data = df11.registerTempTable("data")
output = spark.sql("SELECT * FROM data")
output.show(2)
#display(output)

+--------------------+--------------------+--------------------+-----------+--------------------+-------------+--------------------+-----------+------------------+------------------+-------+-----+--------+-------+--------------------+---------+---------+---------+------------+--------------------+
|              Título|              Gênero|         GenreLabels|Pré estreia|          Temporadas|SeasonsParsed|Episódios analisados|Comprimento|Comprimento mínimo|Comprimento máximo| Status|Ativo|    Mesa| Língua|         dt_inclusao|Premiere1|Premiere2|Premiere3|FullPremiere|   Data_de_Alteração|
+--------------------+--------------------+--------------------+-----------+--------------------+-------------+--------------------+-----------+------------------+------------------+-------+-----+--------+-------+--------------------+---------+---------+---------+------------+--------------------+
|Dance & Sing with...|chrildrens musica...|chrildrens,musica...|  18-May-18|1 season, 11 epis...|      

In [0]:
output.write.format("csv").mode("overwrite").option("sep","\t").save("/opt/intellij/spark_datagree/data/my-tsv-file.csv")

In [0]:
 # 7. Criar apenas 1 . csv com as seguintes colunas que foram nomeadas anteriormente "Title, Genre, Seasons, Premiere, Language, Active, Status, dt_inclusao, Data de Alteração" as
# colunas devem estar em português com header e separadas por ";".

output.write.format("csv").mode("overwrite").option("sep","\t").save("/opt/intellij/spark_datagree/data/my-tsv-file.csv")

output.write.format("csv").save("/FileStore/tables/mydata_3.csv").option("sep", ";").option("header", "true")
