In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, length, avg
from pyspark.sql.types import StringType
# Create a SparkSession
spark = SparkSession.builder.appName("DataProcessing").getOrCreate()

In [0]:






def item_least_rating(data):
    least_rated_item = data.orderBy("rating").first().asDict() if data.count() > 0 else None
    return least_rated_item

def item_most_rating(data):
    most_rated_item = data.orderBy(col("rating").desc()).first().asDict() if data.count() > 0 else None
    return most_rated_item

def item_longest_review(data):
    longest_review_item = data.orderBy(length("review_text").desc()).first().asDict() if data.count() > 0 else None
    return longest_review_item

def transform_date(data):
    data = data.withColumn("new_date", data["old_date_col"].cast(StringType()))
    return data

def desired_dataframe_operation(data):
    avg_rating_per_item = data.groupBy("item_id").agg(avg("rating").alias("avg_rating"))
    return avg_rating_per_item.collect()


In [0]:
    #--------------------------------------------------------------------------
    savefile_path = "/..../output_data.parquet"  # Replace  with your desired directory name

    # Save DataFrame to Parquet file in Databricks File System (DBFS)
    transformed_data.write.mode("overwrite").parquet(savefile_path)

    #--------------------------------------------------------------------------


In [0]:

def main():
    sample_data = spark.read.format("json").load("dbfs:/FileStore/shared_uploads/wesleyjohnjayakumar@gmail.com/australian_user_reviews.json")

    least_rated_item = item_least_rating(sample_data)
    if least_rated_item:
        print("Item with the least rating:")
        print(least_rated_item)
    else:
        print("No data found for item with the least rating.")
    
    #--------------------------------------------------------------------------

    most_rated_item = item_most_rating(sample_data)
    if most_rated_item:
        print("\nItem with the most rating:")
        print(most_rated_item)
    else:
        print("No data found for item with the most rating.")
    #--------------------------------------------------------------------------

    longest_review_item = item_longest_review(sample_data)
    if longest_review_item:
        print("\nItem with the longest review:")
        print(longest_review_item)
    else:
        print("No data found for item with the longest review.")
    #--------------------------------------------------------------------------

    transformed_data = transform_date(sample_data)

    desired_operation_result = desired_dataframe_operation(transformed_data)
    if desired_operation_result:
        print("\nDesired DataFrame operation result: Average Rating per Item")
        for row in desired_operation_result:
            print(row)
    else:
        print("No data found for desired DataFrame operation.")

if __name__ == "__main__":
    main()


In [0]:
from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder.appName("ParquetToMySQL").getOrCreate()

# Read the Parquet file into a PySpark DataFrame
file_path = "path_to_your_parquet_file"  # Replace with the actual file path
loaded_df = spark.read.parquet(file_path)

# Display the loaded DataFrame
loaded_df.show()



In [0]:

# Define the MySQL properties
db_name = 'mydb'
host = 'localhost'
user = 'root'
password = 'password'
table_name = 'table_name'

# Write the DataFrame to the MySQL table
loaded_df.write.format("jdbc").option("url", f"jdbc:mysql://{host}/{db_name}") \
    .option("dbtable", table_name) \
    .option("user", user) \
    .option("password", password) \
    .mode("overwrite") \
    .save()

# Stop the SparkSession
spark.stop()