In [1]:
from pyspark.sql import SparkSession
import os
spark = SparkSession.builder.appName('weekly_spark').getOrCreate()
#spark

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, ArrayType
from pyspark.sql.functions import length, col, count, expr, monotonically_increasing_id, lit

reviews_schema = StructType([
    StructField("App ID", IntegerType(), True),
    StructField("Review", StringType(), True),
    StructField("Voted Up", StringType(), True)
])

weekly_top_news_schema = StructType([
    StructField("appnews", StructType([
        StructField("appid", IntegerType()),
        StructField("newsitems", ArrayType(StructType([
            StructField("gid", StringType()),
            StructField("title", StringType()),
            StructField("url", StringType()),
            StructField("is_external_url", BooleanType()),
            StructField("author", StringType()),
            StructField("contents", StringType()),
            StructField("feedlabel", StringType()),
            StructField("date", IntegerType()),
            StructField("feedname", StringType()),
            StructField("feed_type", IntegerType()),
            StructField("appid", IntegerType()),
            StructField("tags", ArrayType(StringType()))
        ]))),
        StructField("count", IntegerType())
    ]))
])

top_sellers_schema = StructType([
    StructField("Rank", IntegerType(), True),
    StructField("Game Name", StringType(), True),
    StructField("Free to Play", IntegerType(), True),
    ])

top_sellers_appids_schema = StructType([
    StructField("App ID", IntegerType(), True),
])


In [3]:
WEEKLY_DATA_PATH = r'../data/weekly_data/'
reviews_path = os.path.join(WEEKLY_DATA_PATH, 'reviews/')
FILE_DATE = None

try:
    csv_files = [f for f in os.listdir(reviews_path) if f.endswith('.csv')]
    if csv_files:
        csv_file = csv_files[0]
        FILE_DATE = csv_file[0].split('.')[0].split('_')[0]
        csv_file_path = os.path.join(reviews_path, csv_file)
        most_daily_played = spark.read.csv(csv_file_path, header=True, schema=reviews_schema)
    else:
        print("No CSV files found in the 'reviews_path' directory.")
except Exception as e:
    print("An error occurred while reading the CSV file:", e)

In [4]:
# Cleaning the data
most_daily_played = most_daily_played.na.drop(subset=["Review", "Voted Up", "App ID"])
most_daily_played = most_daily_played.filter(length(col("Review")) >= 2)

# Counting the number of positive and negative reviews
counted_reviews = most_daily_played.groupBy("App ID").pivot("Voted Up", ["pos", "neg"]).agg(count("*").alias("count"))

# Seprarating the positive and negative reviews
neg_reviews_df = most_daily_played.filter(most_daily_played["Voted Up"] == "neg")
pos_reviews_df = most_daily_played.filter(most_daily_played["Voted Up"] == "pos")

neg_reviews_df = neg_reviews_df.withColumn("FILE_DATE", lit(FILE_DATE))
pos_reviews_df = pos_reviews_df.withColumn("FILE_DATE", lit(FILE_DATE))
counted_reviews = counted_reviews.withColumn("FILE_DATE", lit(FILE_DATE))

In [5]:
#Top_sellers
WEEKLY_TOP_SELLERS_PATH = WEEKLY_DATA_PATH + r'top_sellers/'
files = os.listdir(WEEKLY_TOP_SELLERS_PATH)

FILE_DATE = None
try:
    csv_file1 = [f for f in files if f.endswith('weekly_top_sellers.csv')]
    FILE_DATE = csv_file1[0].split('.')[0].split('_')[0]
    top_sellers_games = spark.read.csv(
        WEEKLY_TOP_SELLERS_PATH + csv_file1[0],
        header=True,
        schema=top_sellers_schema  
    )
    csv_file2 = [f for f in files if f.endswith('weekly_top_sellers_appIds.csv')]
    FILE_DATE = csv_file2[0].split('.')[0].split('_')[0]
    top_sellers_appids = spark.read.csv(
        WEEKLY_TOP_SELLERS_PATH + csv_file2[0],
        header=True,
        schema=top_sellers_appids_schema  
    )
    top_sellers_appids = top_sellers_appids.withColumn(
    "Rank",(monotonically_increasing_id() + 1).cast("int"))
    top_sellers = top_sellers_games.join(top_sellers_appids, on=["Rank"], how="inner")
    top_sellers = top_sellers.withColumn("FILE_DATE", lit(FILE_DATE))
    #top_sellers.show()   

except Exception as e:
    print("An error occurred while reading the CSV file:", e)

In [6]:
neg_reviews_path = r"../saved_data/weekly_data/neg_reviews"
pos_reviews_path = r"../saved_data/weekly_data/pos_reviews"
counted_reviews_path = r"../saved_data/weekly_data/counted_reviews"
top_sellers_path = r"../saved_data/weekly_data/top_sellers"

# Save the DataFrame as CSV
neg_reviews_df.write.format("csv").mode("overwrite").option("header", "true").save(neg_reviews_path)
pos_reviews_df.write.format("csv").mode("overwrite").option("header", "true").save(pos_reviews_path)
counted_reviews.write.format("csv").mode("overwrite").option("header", "true").save(counted_reviews_path)
top_sellers.write.format("csv").mode("overwrite").option("header", "true").save(top_sellers_path)

AttributeError: 'str' object has no attribute 'write'

In [None]:
# need to work on news

In [None]:
merged_df = None
WEEKLY_TOP_NEWS_PATH = WEEKLY_DATA_PATH + r'news/'
files = os.listdir(WEEKLY_TOP_NEWS_PATH)
try:
    json_files = [pos_json for pos_json in files if pos_json.endswith('.json')]
    for file in json_files:
        steam_game_news = spark.read.json(
            WEEKLY_TOP_NEWS_PATH + file,
            multiLine=True,
            schema = weekly_top_news_schema     
        )
        if merged_df is None:
            merged_df = steam_game_news
        else:
            merged_df = merged_df.union(steam_game_news)
except:
    print('No json files found')


In [None]:
file

In [None]:
temp = WEEKLY_TOP_NEWS_PATH + file

In [None]:
temp

In [None]:
if os.path.exists(temp):
    print(f"The JSON file at {temp} exists.")
else:
    print(f"The JSON file at {temp} does not exist.")

In [None]:
merged_df.show(5)

In [None]:
from pyspark.sql.functions import explode, split
# Extract the 'appid' value as a string from the 'appnews' column
split_df = merged_df.withColumn("appid", expr("cast(appnews.appid as string)"))

# Show the resulting DataFrame
split_df.show(truncate=False)


In [None]:
steam_game_news.show()

In [None]:
#reviews
WEEKLY_TOP_10_REVIEWS_PATH = WEEKLY_DATA_PATH + r'reviews/'
files = os.listdir(WEEKLY_DATA_PATH)
try:
    csv_file = [f for f in files if f.endswith('.txt')]
    for file in csv_file:
        reviews = spark.read.text(
            WEEKLY_TOP_10_REVIEWS_PATH + file,
        )
        reviews.cache()
        reviews.show()
        break
except Exception as e:
    print("An error occurred while reading the CSV file:", e)