In [0]:
from pyspark.sql.functions import col, coalesce, substring, split, regexp_extract, when, year, month, dayofmonth, from_unixtime, regexp_replace
 
inputPath = "/mnt/rawleonardo/rawzone/" # Defini o local da entrada de dados
 
schema = spark.read.json("/mnt/Schema/Schema.json").schema # Defini o Schema para realização do processo de Streaming
 
ListaPos = r":\)|:\]|:P|:p|:s|:d|:D|:\}|;\)|;\]|;P|;p|;s|;d|;D|;\}|=\)|=\]|=P|=p|=d|=D|=\}|=S|:-\)|:-\]|:-P|:-p|:-s|:-d|:-D|:-\}|;-\)|;-\]|;-P|;-p|;-s|;-d|;-D|;-\}|=-\)|=-\]|=-P|=-p|=-d|=-D|=-\}|=-S|: \)|\(:|\( :|:- \)|😂|❤️|😍|🤣|😊|🙏|💕|😘|👍|😅|👏|😁|🔥|💖|😆|💪|😉|👌|🤗|😎|😇|🌹|🎉|💞|✌️|✨|😌|🌸|🙌|😋|😏|🙂|🤩|😄|😀|💯|🤭|❣️|😜|🙋|🤪|👊|💃|😚|😝|🙃|🍀|🌷|😻|✅|🌈|😈|🤘|✔️|💐|🎊|💘|🌺"
 
ListaNeg = r":\(|:\/|:c|:\\|:C|:\[|;\(|;\/|;c|;\\|;C|;\[|=\(|=\/|=c|=\\|=C|=\[|:-\(|:-\/|:-c|:-\\|:-C|:-\[|;-\(|;-\/|;-c|;-\\|;-C|;-\[|=-\(|=-\/|=-c|=-\\|=-C|=-\[|😭|😢|🤔|🙄|😔|🤦|😱|😒|😪|😑|😞|😩|😡|😥|😳|✋|😴|😬|😓|😣|🏃|☹️|😠|🥺|🤬"
 
Simbolos = ListaPos + ListaNeg
 
Nomes = "Bolsonaro|bolsonaro|bonoro|Bonoro|bozo|Bozo|Jair|jair|bozonaro|Bozonaro|jairbolsonaro|Jairbolsonaro"
 
rm_links = "(https?:\/\/(?:www\.|(?!www))[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]\.[^\s]{2,}|www\.[a-zA-Z0-9][a-zA-Z0-9-]+[a-zA-Z0-9]\.[^\s]{2,}|https?:\/\/(?:www\.|(?!www))[a-zA-Z0-9]+\.[^\s]{2,}|www\.[a-zA-Z0-9]+\.[^\s]{2,})"
 
Dataframe = (
  spark
    .readStream
    .schema(schema)    
    .json(inputPath)
)
 
query = (  
 Dataframe
  .select("id", coalesce(col("extended_tweet.full_text"), col("retweeted_status.extended_tweet.full_text"), col("text")).alias("text"), "timestamp_ms", "source", "retweeted_status.retweet_count") # Selecionar Colunas
  .withColumn("text", regexp_replace(col("text"), rm_links, "")) # Remove os links da coluna "text"
  .withColumn("source", split(col("source"), ">").getItem(1)) # Limpa caracteres indesejados da coluna "source"
  .withColumn("source", split(col("source"), "<").getItem(0)) # Limpa caracteres indesejados da coluna "source"
  .withColumn("timestamp_ms", from_unixtime(col("timestamp_ms")/1000)).withColumnRenamed("timestamp_ms", "tweet_data") # Transforma a coluna "timestamp_ms" para o formato "YYYY/MM/DD/mm/ss"
  .withColumn("Simbolo", regexp_extract(col("text"), f'{ListaPos}|{ListaNeg}', 0)) # Criando a coluna simbolo e adicionando a primeira ocorrência dos emojis
  .withColumn("Sentimento", when(col("Simbolo").rlike(ListaPos), "Positivo").when(col("Simbolo").rlike(ListaNeg), "Negativo").otherwise("Neutro")) # Cria a coluna sentimento e classifica como positivo/negativo
  .withColumn("Ano", year("tweet_data")).withColumn("Mes", month("tweet_data")).withColumn("Dia", dayofmonth("tweet_data")) # Cria 3 colunas "Ano" "Mes" "Dia" para fazer o partitionBy  
  .filter(col("text").rlike(Nomes)) # Filtra pelo nome do Presidente
  .filter(col("Simbolo").rlike(Simbolos)) # Filtra apenas tweets com emojis
  .withColumn("retweet_count", when(col("retweet_count").isNull(), 0).otherwise(col("retweet_count"))) # Substitui os valores nulos por 0
  .repartition("Ano", "Mes", "Dia") # Faz o repartition pela coluna "Dia"
    
  #Escrever o arquivo Parquet 
  .writeStream
  .trigger(processingTime="10 minutes") # Salva um arquivo em um espaço de tempo de 15 minutos  
  .partitionBy("Ano", "Mes", "Dia") # Particiona por Ano/Mes/Dia
  .format("parquet")
  .option("path", "/mnt/rawleonardo/refzone/Twitter2021")
  .option("checkpointLocation", "/mnt/rawleonardo/refzone/Twitter2021/checkpoint")
  .start()
)

In [0]:
# Juntar todos os arquivos em um Parquet só, facilitando a leitura em PowerBI
 
df = spark.read.parquet("/mnt/rawleonardo/refzone/Twitter2021/Ano=2021")
df = df.coalesce(1).write.parquet("mnt/rawleonardo/refzone/Twitter2021/Coalesce2021")

In [0]:
# Exclusões

%fs rm /mnt/rawleonardo/refzone/Twitter2021/ -r

In [0]:
# Exclusões

%rm /dbfs/mnt/rawleonardo/rawzone/*.json