<a href="https://colab.research.google.com/github/evellynliena/PrakBigData/blob/master/Week_3.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [('James', 'Sales', 3000),
        ('Michael', 'Sales', 4600),
        ('Robert', 'Sales', 4100),
        ('Maria', 'Finance', 3000)]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)
df.show()

+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
|       Maria|   Finance|  3000|
+------------+----------+------+



In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, mean, max, sum

spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [
    ('James', 'Sales', 3000),
    ('Michael', 'Sales', 4600),
    ('Robert', 'Sales', 4100),
    ('Maria', 'Finance', 3000)
]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)

print("=== DataFrame Awal ===")
df.show()

print("=== Select EmployeeName & Salary ===")
df.select('EmployeeName', 'Salary').show()

print("=== Filter Salary > 3000 ===")
df.filter(df['Salary'] > 3000).show()

print("=== Rata-rata gaji per Department ===")
df.groupBy('Department').avg('Salary').show()

print("=== Total gaji per departemen ===")
df.groupBy("Department").sum("Salary").show()

print("=== Insight per departemen (mean, max, sum) ===")
df.groupBy("Department").agg(
    mean("Salary").alias("AvgSalary"),
    max("Salary").alias("MaxSalary"),
    sum("Salary").alias("TotalSalary")
).show()

=== DataFrame Awal ===
+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|       James|     Sales|  3000|
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
|       Maria|   Finance|  3000|
+------------+----------+------+

=== Select EmployeeName & Salary ===
+------------+------+
|EmployeeName|Salary|
+------------+------+
|       James|  3000|
|     Michael|  4600|
|      Robert|  4100|
|       Maria|  3000|
+------------+------+

=== Filter Salary > 3000 ===
+------------+----------+------+
|EmployeeName|Department|Salary|
+------------+----------+------+
|     Michael|     Sales|  4600|
|      Robert|     Sales|  4100|
+------------+----------+------+

=== Rata-rata gaji per Department ===
+----------+-----------+
|Department|avg(Salary)|
+----------+-----------+
|     Sales|     3900.0|
|   Finance|     3000.0|
+----------+-----------+

=== Total gaji per departemen ===
+----------+-----------+
|Department|sum(Salary

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

spark = SparkSession.builder.appName('HandsOnPertemuan3').getOrCreate()

data = [
    ('James', 'Sales', 3000),
    ('Michael', 'Sales', 4600),
    ('Robert', 'Sales', 4100),
    ('Maria', 'Finance', 3000)
]
columns = ['EmployeeName', 'Department', 'Salary']

df = spark.createDataFrame(data, schema=columns)

df = df.withColumn('SalaryBonus', col('Salary') * 0.1)
df.show()

df = df.withColumn('TotalCompensation', col('Salary') + col('SalaryBonus'))
df.show()

+------------+----------+------+-----------+
|EmployeeName|Department|Salary|SalaryBonus|
+------------+----------+------+-----------+
|       James|     Sales|  3000|      300.0|
|     Michael|     Sales|  4600|      460.0|
|      Robert|     Sales|  4100|      410.0|
|       Maria|   Finance|  3000|      300.0|
+------------+----------+------+-----------+

+------------+----------+------+-----------+-----------------+
|EmployeeName|Department|Salary|SalaryBonus|TotalCompensation|
+------------+----------+------+-----------+-----------------+
|       James|     Sales|  3000|      300.0|           3300.0|
|     Michael|     Sales|  4600|      460.0|           5060.0|
|      Robert|     Sales|  4100|      410.0|           4510.0|
|       Maria|   Finance|  3000|      300.0|           3300.0|
+------------+----------+------+-----------+-----------------+



In [None]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F

windowSpec = Window.partitionBy('Department').orderBy('Salary')
df.withColumn('Rank', F.rank().over(windowSpec)).show()

+------------+----------+------+-----------+-----------------+----+
|EmployeeName|Department|Salary|SalaryBonus|TotalCompensation|Rank|
+------------+----------+------+-----------+-----------------+----+
|       Maria|   Finance|  3000|      300.0|           3300.0|   1|
|       James|     Sales|  3000|      300.0|           3300.0|   1|
|      Robert|     Sales|  4100|      410.0|           4510.0|   2|
|     Michael|     Sales|  4600|      460.0|           5060.0|   3|
+------------+----------+------+-----------+-----------------+----+



# **TUGAS 5**

In [None]:
import kagglehub

path = kagglehub.dataset_download("rounakbanik/the-movies-dataset")

print("Path to dataset files:", path)

Path to dataset files: /kaggle/input/the-movies-dataset


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession.builder.appName("MoviesDataAnalysis").getOrCreate()

movies_df = spark.read.csv(
    "/kaggle/input/the-movies-dataset/movies_metadata.csv",
    header=True,
    inferSchema=True
)

movies_df_clean = movies_df \
    .withColumn("budget", F.col("budget").cast("long")) \
    .withColumn("revenue", F.col("revenue").cast("long")) \
    .withColumn("runtime", F.col("runtime").cast("double")) \
    .withColumn("popularity", F.col("popularity").cast("double")) \
    .withColumn("vote_average", F.col("vote_average").cast("double")) \
    .withColumn("vote_count", F.col("vote_count").cast("long"))

print("=== Struktur DataFrame ===")
movies_df.printSchema()

=== Struktur DataFrame ===
root
 |-- adult: string (nullable = true)
 |-- belongs_to_collection: string (nullable = true)
 |-- budget: string (nullable = true)
 |-- genres: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- id: string (nullable = true)
 |-- imdb_id: string (nullable = true)
 |-- original_language: string (nullable = true)
 |-- original_title: string (nullable = true)
 |-- overview: string (nullable = true)
 |-- popularity: string (nullable = true)
 |-- poster_path: string (nullable = true)
 |-- production_companies: string (nullable = true)
 |-- production_countries: string (nullable = true)
 |-- release_date: string (nullable = true)
 |-- revenue: string (nullable = true)
 |-- runtime: string (nullable = true)
 |-- spoken_languages: string (nullable = true)
 |-- status: string (nullable = true)
 |-- tagline: string (nullable = true)
 |-- title: string (nullable = true)
 |-- video: string (nullable = true)
 |-- vote_average: string (nullable = true)


In [None]:
from pyspark.sql.window import Window

movies_nonzero = movies_df_clean.filter(F.col("revenue") > 0)

movies_nonzero = movies_nonzero.withColumn("year", F.year("release_date"))

windowSpec = Window.partitionBy("year").orderBy(F.desc("revenue"))

top_movies = movies_nonzero \
    .withColumn("Rank", F.rank().over(windowSpec)) \
    .filter("Rank = 1") \
    .select("year", "title", "revenue", "Rank") \
    .orderBy("year")

top_movies_clean = top_movies.filter(F.col("year").isNotNull())
top_movies_clean.show(20)

+----+--------------------+--------+----+
|year|               title| revenue|Rank|
+----+--------------------+--------+----+
|1915|The Birth of a Na...|11000000|   1|
|1916|20,000 Leagues Un...| 8000000|   1|
|1918|              Mickey| 8000000|   1|
|1921|             The Kid| 2500000|   1|
|1922|       Foolish Wives|  400200|   1|
|1923|        Safety Last!|     623|   1|
|1924| The Thief of Bagdad| 1213880|   1|
|1925|      The Big Parade|22000000|   1|
|1926|            Sparrows|  966878|   1|
|1927|        My Best Girl| 1027757|   1|
|1928|  Lights of New York| 1200000|   1|
|1929| The Broadway Melody| 4358000|   1|
|1930|       Hell's Angels| 8000000|   1|
|1931|        Frankenstein|12000000|   1|
|1932|         Grand Hotel| 2594000|   1|
|1933|           King Kong|10000000|   1|
|1934|It Happened One N...| 4500000|   1|
|1935|Mutiny on the Bounty| 4460000|   1|
|1936|        Modern Times| 8500000|   1|
|1937|        Pépé le Moko|  153936|   1|
+----+--------------------+-------

In [None]:
movies_clean = movies_df2.withColumn(
    "vote_average", F.col("vote_average").cast("double")
)
movies_clean = movies_clean.filter(
    (F.col("vote_average").isNotNull()) &
    (F.col("vote_average") >= 0) &
    (F.col("vote_average") <= 10)
)
movies_clean.select(
    F.avg("vote_average").alias("AvgRating"),
    F.max("vote_average").alias("MaxRating"),
    F.min("vote_average").alias("MinRating")
).show()


+-----------------+---------+---------+
|        AvgRating|MaxRating|MinRating|
+-----------------+---------+---------+
|5.590573954904963|     10.0|      0.0|
+-----------------+---------+---------+



In [None]:
from pyspark.sql.types import ArrayType, StructType, StructField, StringType
from pyspark.sql.functions import from_json, explode, col

genre_schema = ArrayType(StructType([
    StructField("id", StringType()),
    StructField("name", StringType())
]))

movies_df2 = movies_df.withColumn("genres", from_json(col("genres"), genre_schema))

genre_counts = movies_df2.withColumn("genre", explode(col("genres.name"))) \
    .groupBy("genre") \
    .count() \
    .orderBy(F.desc("count"))

genre_counts.show(10)

+---------------+-----+
|          genre|count|
+---------------+-----+
|          Drama|20239|
|         Comedy|13123|
|       Thriller| 7612|
|        Romance| 6722|
|         Action| 6581|
|         Horror| 4656|
|          Crime| 4294|
|    Documentary| 3929|
|      Adventure| 3477|
|Science Fiction| 3041|
+---------------+-----+
only showing top 10 rows



In [None]:
movies_final = movies_clean.filter(F.col("year").isNotNull())

topN_window = Window.partitionBy("year").orderBy(F.desc("revenue"))
movies_top3 = movies_final.withColumn("Rank", F.dense_rank().over(topN_window)) \
                          .filter(F.col("Rank") <= 3)

movies_top3.select("year", "title", "revenue", "Rank") \
    .orderBy("year", "Rank") \
    .show(20, truncate=False)

+----+-----------------------------+---------+----+
|year|title                        |revenue  |Rank|
+----+-----------------------------+---------+----+
|1915|The Birth of a Nation        |1.1E7    |1   |
|1915|The Cheat                    |137365.0 |2   |
|1916|20,000 Leagues Under the Sea |8000000.0|1   |
|1918|Mickey                       |8000000.0|1   |
|1921|The Kid                      |2500000.0|1   |
|1922|Foolish Wives                |400200.0 |1   |
|1923|Safety Last!                 |623.0    |1   |
|1924|The Thief of Bagdad          |1213880.0|1   |
|1925|The Big Parade               |2.2E7    |1   |
|1925|Ben-Hur: A Tale of the Christ|9000000.0|2   |
|1925|The Gold Rush                |2500000.0|3   |
|1926|Sparrows                     |966878.0 |1   |
|1926|Flesh and the Devil          |658000.0 |2   |
|1927|My Best Girl                 |1027757.0|1   |
|1927|Metropolis                   |650422.0 |2   |
|1927|Sunrise: A Song of Two Humans|121.0    |3   |
|1928|Lights

In [None]:
avg_revenue = movies_final.groupBy("year").agg(F.avg("revenue").alias("AvgRevenue"))
avg_revenue.orderBy("year").show(20)

+----+------------------+
|year|        AvgRevenue|
+----+------------------+
|1915|         5568682.5|
|1916|         8000000.0|
|1918|         8000000.0|
|1921|         2500000.0|
|1922|          400200.0|
|1923|             623.0|
|1924|         1213880.0|
|1925|         6709020.2|
|1926|          812439.0|
|1927| 559433.3333333334|
|1928|          543529.6|
|1929|         4358000.0|
|1930|         4003970.0|
|1931|         3814592.5|
|1932|          798512.5|
|1933|         5074200.0|
|1934|2347666.6666666665|
|1935|         2010366.2|
|1936|         3734000.0|
|1937|          153936.0|
+----+------------------+
only showing top 20 rows



In [None]:
movies_final = movies_final.withColumn("vote_average", F.col("vote_average").cast("double"))

movies_final.select("vote_average", "revenue") \
    .summary("count", "mean", "min", "max") \
    .show()

+-------+-----------------+-------------------+
|summary|     vote_average|            revenue|
+-------+-----------------+-------------------+
|  count|             6795|               6798|
|   mean|6.215349521707127|6.907081362547809E7|
|    min|              0.0|                1.0|
|    max|              9.5|      2.787965087E9|
+-------+-----------------+-------------------+



In [None]:
from pyspark.sql.types import DoubleType

movies_clean = movies_df2.withColumn(
    "revenue", F.col("revenue").cast("double")
).withColumn(
    "release_date", F.to_date("release_date", "yyyy-MM-dd")
).withColumn(
    "year", F.year("release_date")
)

movies_clean = movies_clean.filter(
    (F.col("revenue") > 0) & (F.col("year").isNotNull())
)

windowSpec = Window.partitionBy("year").orderBy("revenue") \
                   .rowsBetween(Window.unboundedPreceding, Window.currentRow)

movies_running = movies_clean.withColumn(
    "CumulativeRevenue", F.sum("revenue").over(windowSpec)
)

movies_running.select("title", "year", "revenue", "CumulativeRevenue") \
    .orderBy("year", "CumulativeRevenue") \
    .show(15, truncate=False)


+-----------------------------+----+---------+-----------------+
|title                        |year|revenue  |CumulativeRevenue|
+-----------------------------+----+---------+-----------------+
|The Cheat                    |1915|137365.0 |137365.0         |
|The Birth of a Nation        |1915|1.1E7    |1.1137365E7      |
|20,000 Leagues Under the Sea |1916|8000000.0|8000000.0        |
|Mickey                       |1918|8000000.0|8000000.0        |
|The Kid                      |1921|2500000.0|2500000.0        |
|Foolish Wives                |1922|400200.0 |400200.0         |
|Safety Last!                 |1923|623.0    |623.0            |
|The Thief of Bagdad          |1924|1213880.0|1213880.0        |
|The Merry Widow              |1925|1.0      |1.0              |
|Battleship Potemkin          |1925|45100.0  |45101.0          |
|The Gold Rush                |1925|2500000.0|2545101.0        |
|Ben-Hur: A Tale of the Christ|1925|9000000.0|1.1545101E7      |
|The Big Parade          

In [None]:
rank_window = Window.partitionBy("year").orderBy(F.desc("revenue"))

movies_ranked = movies_clean.withColumn("Rank", F.rank().over(rank_window))

movies_ranked.select("title", "year", "revenue", "Rank") \
    .orderBy("year", "Rank") \
    .show(20, truncate=False)

+-----------------------------+----+---------+----+
|title                        |year|revenue  |Rank|
+-----------------------------+----+---------+----+
|The Birth of a Nation        |1915|1.1E7    |1   |
|The Cheat                    |1915|137365.0 |2   |
|20,000 Leagues Under the Sea |1916|8000000.0|1   |
|Mickey                       |1918|8000000.0|1   |
|The Kid                      |1921|2500000.0|1   |
|Foolish Wives                |1922|400200.0 |1   |
|Safety Last!                 |1923|623.0    |1   |
|The Thief of Bagdad          |1924|1213880.0|1   |
|The Big Parade               |1925|2.2E7    |1   |
|Ben-Hur: A Tale of the Christ|1925|9000000.0|2   |
|The Gold Rush                |1925|2500000.0|3   |
|Battleship Potemkin          |1925|45100.0  |4   |
|The Merry Widow              |1925|1.0      |5   |
|Sparrows                     |1926|966878.0 |1   |
|Flesh and the Devil          |1926|658000.0 |2   |
|My Best Girl                 |1927|1027757.0|1   |
|Metropolis 