In [37]:
import org.apache.spark.sql.{SparkSession, SaveMode}
import org.apache.spark.sql.functions._
import java.time.Instant
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import spark.implicits._
import org.apache.spark.sql.types._


## Case Study 5: Time-Based Data Partitioning for Ratings

Objective: Partition user ratings data by year and save in Parquet format.

Scenario: The ratings.csv file includes a timestamp field that needs to be converted into human-readable years, and the data needs to be stored year-wise.
Steps:

Ingestion: Load ratings.csv as a DataFrame from GCP Cloud Storage.

Transformation:

Use DataFrames to convert the timestamp field into a year column.
Convert the DataFrame to an RDD to partition records by year using a key-value pair transformation.
Partitioning:

Save RDD partitions as separate Parquet files in HDFS, with the structure /ratings/{year}/ratings.parquet.
Verification:

Ensure that each year folder in HDFS contains only the records for that year.

In [38]:
// Step 1: Initialize SparkSession
val spark = SparkSession.builder()
  .appName("Time-Based Data Partitioning for Ratings")
  .getOrCreate()

// Step 2: Load ratings.csv as a DataFrame from GCP
val bucketName = "scala_assgn_bucket"
val ratingsPath = s"gs://$bucketName/ml-32m/ratings.csv"
val ratingsDF = spark.read.option("header", "true").csv(ratingsPath)

// Step 3: Convert the timestamp field to a year column
val ratingsWithYearDF = ratingsDF.withColumn(
  "year",
  year(from_unixtime(col("timestamp").cast("long")))
)
ratingsWithYearDF.show()


+------+-------+------+---------+----+
|userId|movieId|rating|timestamp|year|
+------+-------+------+---------+----+
|     1|     17|   4.0|944249077|1999|
|     1|     25|   1.0|944250228|1999|
|     1|     29|   2.0|943230976|1999|
|     1|     30|   5.0|944249077|1999|
|     1|     32|   5.0|943228858|1999|
|     1|     34|   2.0|943228491|1999|
|     1|     36|   1.0|944249008|1999|
|     1|     80|   5.0|944248943|1999|
|     1|    110|   3.0|943231119|1999|
|     1|    111|   5.0|944249008|1999|
|     1|    161|   1.0|943231162|1999|
|     1|    166|   5.0|943228442|1999|
|     1|    176|   4.0|944079496|1999|
|     1|    223|   3.0|944082810|1999|
|     1|    232|   5.0|943228442|1999|
|     1|    260|   5.0|943228696|1999|
|     1|    302|   4.0|944253272|1999|
|     1|    306|   5.0|944248888|1999|
|     1|    307|   5.0|944253207|1999|
|     1|    322|   4.0|944053801|1999|
+------+-------+------+---------+----+
only showing top 20 rows



spark = org.apache.spark.sql.SparkSession@6f78dfd2
bucketName = scala_assgn_bucket
ratingsPath = gs://scala_assgn_bucket/ml-32m/ratings.csv
ratingsDF = [userId: string, movieId: string ... 2 more fields]
ratingsWithYearDF = [userId: string, movieId: string ... 3 more fields]


[userId: string, movieId: string ... 3 more fields]

In [39]:
// Step 4: Transformation - Filter out invalid or incomplete records
val validRatingsDF = ratingsWithYearDF
  .filter(col("userId").isNotNull && col("movieId").isNotNull && col("rating").isNotNull && col("timestamp").isNotNull)

validRatingsDF = [userId: string, movieId: string ... 3 more fields]


[userId: string, movieId: string ... 3 more fields]

In [40]:
// Convert DataFrame to RDD for partitioning by year
val trimmedRatingsDF = validRatingsDF.limit(100000)
val ratingsByYearRDD = trimmedRatingsDF
  .rdd
  .keyBy(row => row.getAs[Int]("year")) // Key RDD by year

trimmedRatingsDF = [userId: string, movieId: string ... 3 more fields]
ratingsByYearRDD = MapPartitionsRDD[23] at keyBy at <console>:93


MapPartitionsRDD[23] at keyBy at <console>:93

In [41]:
// import org.apache.spark.rdd.RDD
// import org.apache.hadoop.fs.{FileSystem, Path}
// import java.io.{BufferedWriter, OutputStreamWriter}

// val outputPath = "hdfs:///user/shraman_jana/user-data"
// val first10Users = groupedByUserRDD.take(10)
// first10Users.foreach { case (userId, ratingsList) =>
//   val userFolderPath = s"${outputPath}/${userId}/ratings.csv"
//   val path = new Path(userFolderPath)
  
//   val fs = FileSystem.get(new java.net.URI("hdfs:///"), new org.apache.hadoop.conf.Configuration())
  
//   if (!fs.exists(path.getParent)) {
//     fs.mkdirs(path.getParent)
//   }

//   val ratingsText = ratingsList.map { case (movieId, rating) =>
//     s"${movieId}, ${rating}"
//   }.mkString("\n")

//   val outputStream = fs.create(path)
//   val writer = new BufferedWriter(new OutputStreamWriter(outputStream))

//   writer.write(ratingsText)

//   writer.close()
//   outputStream.close()
// }

val outputPath = "hdfs:///user/shraman_jana/user-data/Q5"
val groupedByYearRDD = ratingsByYearRDD.groupByKey().mapValues(_.toList)
// val first10Groups = groupedByYearRDD.take(10)

// Verification: Count records per year and print s

outputPath = hdfs:///user/shraman_jana/user-data/Q5
groupedByYearRDD = MapPartitionsRDD[25] at mapValues at <console>:118


MapPartitionsRDD[25] at mapValues at <console>:118

In [12]:
groupedByYearRDD.take(5).foreach(println)

(2021,List([51607,356,4.5,1628551564,2021], [51607,541,4.0,1628375363,2021], [51607,593,4.0,1628367490,2021], [51607,1080,4.5,1628375208,2021], [51607,1197,5.0,1628375164,2021], [51607,1214,3.5,1628375083,2021], [51607,1265,4.0,1628724080,2021], [51607,1407,4.0,1628375127,2021], [51607,1682,5.0,1628551367,2021], [51607,2997,4.5,1628375392,2021], [51607,3949,4.0,1629157686,2021], [51607,7022,4.0,1628367562,2021], [51607,26662,4.5,1628717159,2021], [51607,27704,2.5,1628367565,2021], [51607,50641,5.0,1628367551,2021], [51607,54785,3.0,1628375103,2021], [51607,188773,4.5,1628443104,2021], [51607,193041,5.0,1628443090,2021], [51607,250010,4.5,1628374997,2021], [51613,17,4.5,1627660600,2021], [51613,39,2.5,1627660588,2021], [51613,110,3.5,1627660467,2021], [51613,185,4.0,1627660016,2021], [51613,253,2.5,1627660582,2021], [51613,339,4.0,1627660022,2021], [51613,356,3.0,1627660140,2021], [51613,588,4.0,1627660240,2021], [51613,1265,3.5,1627660186,2021], [51613,1704,2.5,1627660649,2021], [51613

In [42]:

// Define a schema for the DataFrame
val schema = StructType(Seq(
  StructField("userId", StringType, nullable = true),
  StructField("movieId", StringType, nullable = true),
  StructField("rating", StringType, nullable = true),
  StructField("timestamp", StringType, nullable = true)
))
// Partition the RDD by year
val first10Groups = groupedByYearRDD.take(10)
first10Groups.foreach { case (year, records) =>
    // Convert the grouped records back to a DataFrame
    val yearDF = spark.createDataFrame(spark.sparkContext.parallelize(records), schema)
    
    // Save each year's data in Parquet format
    yearDF.write
      .mode(SaveMode.Overwrite)
      .parquet(s"$outputPath/$year/ratings.parquet")
  }


schema = StructType(StructField(userId,StringType,true),StructField(movieId,StringType,true),StructField(rating,StringType,true),StructField(timestamp,StringType,true))
first10Groups = Array((2021,List([31461,50,4.0,1626605929,2021], [31461,296,3.0,1626605547,2021], [31461,457,4.0,1610381751,2021], [31461,527,4.5,1626624253,2021], [31461,912,0.5,1626624526,2021], [31461,1196,0.5,1626624287,2021], [31461,1566,4.5,1626605564,2021], [31461,1688,4.5,1626605566,2021], [31461,2671,3.5,1626605470,2021], [31461,3949,3.5,1610381743,2021], [31461,4886,4.5,1626605579,2021], [31461,4896,4.5,1626605427,2021], [31461,5816,4.5,1626605440,2021], [31461,8368,4.5,1626605441,2021], [31461,40815,4.5,1626605444,2021], [3146...


Array((2021,List([31461,50,4.0,1626605929,2021], [31461,296,3.0,1626605547,2021], [31461,457,4.0,1610381751,2021], [31461,527,4.5,1626624253,2021], [31461,912,0.5,1626624526,2021], [31461,1196,0.5,1626624287,2021], [31461,1566,4.5,1626605564,2021], [31461,1688,4.5,1626605566,2021], [31461,2671,3.5,1626605470,2021], [31461,3949,3.5,1610381743,2021], [31461,4886,4.5,1626605579,2021], [31461,4896,4.5,1626605427,2021], [31461,5816,4.5,1626605440,2021], [31461,8368,4.5,1626605441,2021], [31461,40815,4.5,1626605444,2021], [3146...

In [43]:
sc.stop()