<h2>Importando bibliotecas</h2>

In [26]:
import findspark
findspark.init()
import pyspark


In [27]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

In [28]:
import pandas as pd
import pyspark.pandas as ps

In [None]:
import sys
sys.path.append('/home/acsantos/Documents/Facens_Architecture-for-Data-Processing/scripts/minio')
from move_files import fn_move_files

<h2> Criando sessão do spark + configurações de conexão com bucket </h2>

In [29]:
spark = SparkSession.builder.appName('Steam API - Tratamento').getOrCreate()

In [30]:
def load_config(spark_context: SparkContext):
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.aws.credentials.provider', 'com.amazonaws.auth.EnvironmentVariableCredentialsProvider')
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.path.style.access', 'true')
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.endpoint', 'http://localhost:9000')
    spark_context._jsc.hadoopConfiguration().set('fs.s3a.connection.ssl.enabled', 'false')
    
load_config(spark.sparkContext)


<h2> Lendo dados do bucket </h2>

In [31]:
df = spark.read.json('s3a://bronze/topics/steam/*', multiLine=True)

                                                                                

In [32]:
df.printSchema()

root
 |-- appid: string (nullable = true)
 |-- comment_count: long (nullable = true)
 |-- language: string (nullable = true)
 |-- last_played: long (nullable = true)
 |-- num_games_owned: long (nullable = true)
 |-- num_reviews: long (nullable = true)
 |-- playtime_forever: long (nullable = true)
 |-- playtime_last_two_weeks: long (nullable = true)
 |-- received_for_free: boolean (nullable = true)
 |-- recommendationid: string (nullable = true)
 |-- review: string (nullable = true)
 |-- steam_purchase: boolean (nullable = true)
 |-- steamid: string (nullable = true)
 |-- timestamp_created: long (nullable = true)
 |-- timestamp_updated: long (nullable = true)
 |-- voted_up: boolean (nullable = true)
 |-- votes_funny: long (nullable = true)
 |-- votes_up: long (nullable = true)
 |-- weighted_vote_score: string (nullable = true)
 |-- written_during_early_access: boolean (nullable = true)



In [33]:
df.count()

195

In [34]:
#df = df.to_pandas_on_spark()

In [35]:
df.show(1, truncate=True)

+-------+-------------+--------+-----------+---------------+-----------+----------------+-----------------------+-----------------+----------------+--------------------+--------------+-----------------+-----------------+-----------------+--------+-----------+--------+--------------------+---------------------------+
|  appid|comment_count|language|last_played|num_games_owned|num_reviews|playtime_forever|playtime_last_two_weeks|received_for_free|recommendationid|              review|steam_purchase|          steamid|timestamp_created|timestamp_updated|voted_up|votes_funny|votes_up| weighted_vote_score|written_during_early_access|
+-------+-------------+--------+-----------+---------------+-----------+----------------+-----------------------+-----------------+----------------+--------------------+--------------+-----------------+-----------------+-----------------+--------+-----------+--------+--------------------+---------------------------+
|1055540|            0| english| 1648286540|  

### Convertendo de timestamp UNIX para datetime

In [36]:
from pyspark.sql import functions as f
from pyspark.sql import types as t
from datetime import datetime

In [37]:
df.withColumn('last_played', f.date_format(df.last_played.cast(dataType=t.TimestampType()), "yyyy-MM-dd")) \
  .withColumn('timestamp_created', f.date_format(df.timestamp_created.cast(dataType=t.TimestampType()), "yyyy-MM-dd")) \
  .withColumn('timestamp_updated', f.date_format(df.timestamp_updated.cast(dataType=t.TimestampType()), "yyyy-MM-dd"))

DataFrame[appid: string, comment_count: bigint, language: string, last_played: string, num_games_owned: bigint, num_reviews: bigint, playtime_forever: bigint, playtime_last_two_weeks: bigint, received_for_free: boolean, recommendationid: string, review: string, steam_purchase: boolean, steamid: string, timestamp_created: string, timestamp_updated: string, voted_up: boolean, votes_funny: bigint, votes_up: bigint, weighted_vote_score: string, written_during_early_access: boolean]

In [38]:
df2 = df.withColumn('last_played', f.to_date(df.last_played.cast(dataType=t.TimestampType()))) \
        .withColumn('timestamp_created', f.to_date(df.timestamp_created.cast(dataType=t.TimestampType()))) \
        .withColumn('timestamp_updated', f.to_date(df.timestamp_updated.cast(dataType=t.TimestampType())))

In [39]:
df2 = df2.withColumn("last_played",f.to_timestamp(df2['last_played'])) \
         .withColumn("timestamp_created",f.to_timestamp(df2['timestamp_created'])) \
         .withColumn("timestamp_updated",f.to_timestamp(df2['timestamp_updated']))

    

### Removendo possíveis registros duplicados

In [40]:
df3 = df2.drop_duplicates()

### Filtrando colunas de interesse

In [41]:
df4 = df3.select('appid', 'recommendationid', 'steamid', 'language', 'last_played', 'num_games_owned', 'playtime_forever', 'review', 'voted_up', 'votes_up','timestamp_created')

In [42]:
df4.show()

+-------+----------------+-----------------+--------+-------------------+---------------+----------------+--------------------+--------+--------+-------------------+
|  appid|recommendationid|          steamid|language|        last_played|num_games_owned|playtime_forever|              review|voted_up|votes_up|  timestamp_created|
+-------+----------------+-----------------+--------+-------------------+---------------+----------------+--------------------+--------+--------+-------------------+
|1055540|       110515037|76561198997429067| english|2022-02-03 00:00:00|            162|             163|If you like short...|    true|       0|2022-02-18 00:00:00|
|1055540|       104648492|76561199160365183| english|2022-03-30 00:00:00|            106|             393|i was absolutely ...|    true|       0|2021-11-29 00:00:00|
|1055540|       106793767|76561198295246849| english|2021-12-29 00:00:00|            196|              75|         Bird Gaming|    true|       0|2021-12-29 00:00:00|
|105

In [43]:
df4 = df4.withColumnRenamed('appid', 'app_id')

## Salvando dados tratados no Silver

In [44]:
#df4.write.partitionBy('appid').mode('overwrite').parquet('s3a://silver/steam_reviews/reviews.parquet')

df4.write.partitionBy('app_id').mode('append').parquet('s3a://silver/steam_reviews/reviews.parquet')

# Movendo arquivos que foram lidos para pasta de processados.

In [47]:
fn_move_files(bucket='bronze', sourcePath='topics/steam/partition=0/', destinationPath='processed_files/steam/')