In [None]:
sc

# Delete empty folders (no reviews)

Received data has a bunch of empty folders for timestamps where no new reviews were received. In this section we remove them so later processing will have be easier.

In [None]:
import os
import shutil

In [None]:
def get_folder_size(folder):
    total_size = 0
    for dirpath, dirnames, filenames in os.walk(folder):
        for f in filenames:
            fp = os.path.join(dirpath, f)
            total_size += os.path.getsize(fp)
    return total_size

In [None]:
def delete_small_folders(parent_dir, size_limit):
    for root, dirs, files in os.walk(parent_dir, topdown=False):
        for dir_name in dirs:
            dir_path = os.path.join(root, dir_name)
            folder_size = get_folder_size(dir_path)
            if folder_size < size_limit:
                print(f"Deleting folder: {dir_path}")
                shutil.rmtree(dir_path)

In [None]:
#Empty folders, with no reviews, will have file size of 8 bytes.
path = r"C:\Users\jef-w\Desktop\Uni\KUL\Year_1\Advanced_Analytics\Assignments\Assignment_3\spark\notebooks\Raw_Data\2023-04-13-20-39"
size_limit = 9  # Specify the size limit in bytes
delete_small_folders(path, size_limit)

# Combine Received files into single dataframe

Here we take all the individual received text files and combine them into a single, easy to use csv.
We also clean the text of the reviews of newlines, so it can easily be converted into csv.

In [None]:
import os
import shutil
from pyspark.sql import functions as F

In [None]:
#Function for getting the first dataframe of a folder, so we can concatenate it with the rest
def initialize_dataframe(folder_path):
    for dirpath,dirnames,filenames in os.walk(folder_path):
        for dirname in dirnames:
            subpath = os.path.join(dirpath, dirname)
            df = spark.read.json(subpath)
            
            # Here we remove new lines from the text
            df = df.withColumn("review_text",F.translate("review_text",'\n',' '))
            
            return df

def combine_into_dataframe(folder_path):
    df = initialize_dataframe(folder_path)
    firstdir = True
    
    for dirpath,dirnames,filenames in os.walk(folder_path):
        for dirname in dirnames:
            # We already have the first review, so skip that one
            if(firstdir):
                firstdir = False
                continue
            subpath = os.path.join(dirpath, dirname)
            df1 = spark.read.json(subpath)
            df1 = df1.withColumn("review_text",F.translate("review_text",'\n',''))
            
            df = df.union(df1)
    return df

In [None]:
#Combine all reviews of a folder into a single dataframe

df = combine_into_dataframe(path)
df.show()

In [None]:
# Combine all the reviews in one big beautiful csv
folders = os.path.split(path)
last_folder = folders[-1]
df.coalesce(1).write.option("header",True).csv("full_csv_"+last_folder)

# Combine all data

In [None]:
path1 = r"C:\Users\jef-w\Desktop\Uni\KUL\Year_1\Advanced_Analytics\Assignments\Assignment_3\spark\notebooks\Processed_Data\full_csv_2023-04-11-12-45"
path2 = r"C:\Users\jef-w\Desktop\Uni\KUL\Year_1\Advanced_Analytics\Assignments\Assignment_3\spark\notebooks\Processed_Data\full_csv_2023-04-11-22-16"
path3 = r"C:\Users\jef-w\Desktop\Uni\KUL\Year_1\Advanced_Analytics\Assignments\Assignment_3\spark\notebooks\Processed_Data\full_csv_2023-04-12-11-00"
path4 = r"C:\Users\jef-w\Desktop\Uni\KUL\Year_1\Advanced_Analytics\Assignments\Assignment_3\spark\notebooks\Processed_Data\full_csv_2023-04-12-15-05"
path5 = r"C:\Users\jef-w\Desktop\Uni\KUL\Year_1\Advanced_Analytics\Assignments\Assignment_3\spark\notebooks\Processed_Data\full_csv_2023-04-12-20-05"
path6 = r"C:\Users\jef-w\Desktop\Uni\KUL\Year_1\Advanced_Analytics\Assignments\Assignment_3\spark\notebooks\Processed_Data\full_csv_2023-04-13-13-51"
path7 = r"C:\Users\jef-w\Desktop\Uni\KUL\Year_1\Advanced_Analytics\Assignments\Assignment_3\spark\notebooks\Processed_Data\full_csv_2023-04-13-17-57"
path8 = r"C:\Users\jef-w\Desktop\Uni\KUL\Year_1\Advanced_Analytics\Assignments\Assignment_3\spark\notebooks\Processed_Data\full_csv_2023-04-13-20-39"

df1 = spark.read.format("csv").option("header","true").load(path1)
df2 = spark.read.format("csv").option("header","true").load(path2)
df3 = spark.read.format("csv").option("header","true").load(path3)
df4 = spark.read.format("csv").option("header","true").load(path4)
df5 = spark.read.format("csv").option("header","true").load(path5)
df6 = spark.read.format("csv").option("header","true").load(path6)
df7 = spark.read.format("csv").option("header","true").load(path7)
df8 = spark.read.format("csv").option("header","true").load(path8)

df = df1.union(df2)
df = df.union(df3)
df = df.union(df4)
df = df.union(df5)
df = df.union(df6)
df = df.union(df7)
df = df.union(df8)

In [None]:
# Combine all the reviews in one big beautiful csv
df.coalesce(1).write.option("header",True).csv("full_data_csv")