In [2]:
import org.apache.spark.sql.{SparkSession, SaveMode}
import org.apache.spark.sql.functions._
import java.time.Instant
import java.time.ZoneId
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.types._

In [3]:
val spark = SparkSession.builder()
  .appName("Time-Based Data Partitioning for Ratings")
  .getOrCreate()

val bucketName = "first-job-bucket"
val ratingsPath = s"gs://$bucketName/ratings.csv"
val ratingsDF = spark.read.option("header", "true").csv(ratingsPath)

val ratingsWithYearDF = ratingsDF.withColumn(
  "year",
  year(from_unixtime(col("timestamp").cast("long")))
)
ratingsWithYearDF.show()

+------+-------+------+---------+----+
|userId|movieId|rating|timestamp|year|
+------+-------+------+---------+----+
|     1|     17|   4.0|944249077|1999|
|     1|     25|   1.0|944250228|1999|
|     1|     29|   2.0|943230976|1999|
|     1|     30|   5.0|944249077|1999|
|     1|     32|   5.0|943228858|1999|
|     1|     34|   2.0|943228491|1999|
|     1|     36|   1.0|944249008|1999|
|     1|     80|   5.0|944248943|1999|
|     1|    110|   3.0|943231119|1999|
|     1|    111|   5.0|944249008|1999|
|     1|    161|   1.0|943231162|1999|
|     1|    166|   5.0|943228442|1999|
|     1|    176|   4.0|944079496|1999|
|     1|    223|   3.0|944082810|1999|
|     1|    232|   5.0|943228442|1999|
|     1|    260|   5.0|943228696|1999|
|     1|    302|   4.0|944253272|1999|
|     1|    306|   5.0|944248888|1999|
|     1|    307|   5.0|944253207|1999|
|     1|    322|   4.0|944053801|1999|
+------+-------+------+---------+----+
only showing top 20 rows



spark = org.apache.spark.sql.SparkSession@201b76c9
bucketName = first-job-bucket
ratingsPath = gs://first-job-bucket/ratings.csv
ratingsDF = [userId: string, movieId: string ... 2 more fields]
ratingsWithYearDF = [userId: string, movieId: string ... 3 more fields]


[userId: string, movieId: string ... 3 more fields]

In [4]:
val validRatingsDF = ratingsWithYearDF
  .filter(col("userId").isNotNull && col("movieId").isNotNull && col("rating").isNotNull && col("timestamp").isNotNull)

validRatingsDF = [userId: string, movieId: string ... 3 more fields]


[userId: string, movieId: string ... 3 more fields]

In [5]:
val trimmedRatingsDF = validRatingsDF.limit(100000)
val ratingsByYearRDD = trimmedRatingsDF
  .rdd
  .keyBy(row => row.getAs[Int]("year"))

trimmedRatingsDF = [userId: string, movieId: string ... 3 more fields]
ratingsByYearRDD = MapPartitionsRDD[23] at keyBy at <console>:37


MapPartitionsRDD[23] at keyBy at <console>:37

In [6]:
val outputPath = "hdfs:///user/shraman_jana/user-data/Q5"
val groupedByYearRDD = ratingsByYearRDD.groupByKey().mapValues(_.toList)

outputPath = hdfs:///user/shraman_jana/user-data/Q5
groupedByYearRDD = MapPartitionsRDD[25] at mapValues at <console>:35


MapPartitionsRDD[25] at mapValues at <console>:35

In [7]:
groupedByYearRDD.take(5).foreach(println)

(2021,List([22,316,3.0,1614575253,2021], [22,5049,2.5,1622483509,2021], [22,155509,4.0,1622483544,2021], [22,182823,3.5,1618721258,2021], [22,191489,3.5,1614575270,2021], [22,197889,4.0,1617162338,2021], [22,210271,2.5,1617155546,2021], [22,218867,3.0,1614575299,2021], [22,235509,2.5,1611070726,2021], [22,247150,3.0,1622483323,2021], [23,2890,4.0,1631525045,2021], [23,45672,5.0,1614781934,2021], [23,110110,5.0,1612704900,2021], [23,112183,5.0,1635687207,2021], [23,113348,4.5,1632150664,2021], [23,138036,5.0,1615882370,2021], [23,142488,5.0,1635687186,2021], [23,167544,4.5,1624638843,2021], [23,210861,5.0,1618585183,2021], [23,219444,4.5,1614183386,2021], [23,250010,5.0,1637776534,2021], [28,1209,4.5,1635269264,2021], [28,6350,4.0,1635269318,2021], [28,26776,4.0,1635270009,2021], [28,31658,4.5,1635269250,2021], [28,65261,4.5,1635270913,2021], [28,70533,3.5,1635271336,2021], [28,84187,4.0,1635270872,2021], [28,90439,3.0,1635271693,2021], [28,101962,4.0,1635269711,2021], [28,103228,3.0,16

In [8]:
val schema = StructType(Seq(
  StructField("userId", StringType, nullable = true),
  StructField("movieId", StringType, nullable = true),
  StructField("rating", StringType, nullable = true),
  StructField("timestamp", StringType, nullable = true)
))

val first10Groups = groupedByYearRDD.take(10)
first10Groups.foreach { case (year, records) =>
    val yearDF = spark.createDataFrame(spark.sparkContext.parallelize(records), schema)
    
    yearDF.write
      .mode(SaveMode.Overwrite)
      .parquet(s"$outputPath/$year/ratings.parquet")
  }

schema = StructType(StructField(userId,StringType,true),StructField(movieId,StringType,true),StructField(rating,StringType,true),StructField(timestamp,StringType,true))
first10Groups = Array((2021,List([22,316,3.0,1614575253,2021], [22,5049,2.5,1622483509,2021], [22,155509,4.0,1622483544,2021], [22,182823,3.5,1618721258,2021], [22,191489,3.5,1614575270,2021], [22,197889,4.0,1617162338,2021], [22,210271,2.5,1617155546,2021], [22,218867,3.0,1614575299,2021], [22,235509,2.5,1611070726,2021], [22,247150,3.0,1622483323,2021], [23,2890,4.0,1631525045,2021], [23,45672,5.0,1614781934,2021], [23,110110,5.0,1612704900,2021], [23,112183,5.0,1635687207,2021], [23,113348,4.5,1632150664,2021], [23,138036,5.0,16158823...


Array((2021,List([22,316,3.0,1614575253,2021], [22,5049,2.5,1622483509,2021], [22,155509,4.0,1622483544,2021], [22,182823,3.5,1618721258,2021], [22,191489,3.5,1614575270,2021], [22,197889,4.0,1617162338,2021], [22,210271,2.5,1617155546,2021], [22,218867,3.0,1614575299,2021], [22,235509,2.5,1611070726,2021], [22,247150,3.0,1622483323,2021], [23,2890,4.0,1631525045,2021], [23,45672,5.0,1614781934,2021], [23,110110,5.0,1612704900,2021], [23,112183,5.0,1635687207,2021], [23,113348,4.5,1632150664,2021], [23,138036,5.0,16158823...