# **LAB4_SparkSQL**

## **1. Show ID, title, and genres of all movies with ‘Action’ included in their genres**

In [87]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.appName("MovieApp").getOrCreate()

movies = spark.read.csv('movies_small.csv', header=True, inferSchema=True)

movies_with_action = movies.filter(
    f.array_contains(f.split(f.col('genres'), '\|'), 'Action')
  )

result1 = movies_with_action.select('movieId', 'title', 'genres')
result1.show(truncate=False)

spark.stop()

+-------+------------------------------------------+----------------------------------+
|movieId|title                                     |genres                            |
+-------+------------------------------------------+----------------------------------+
|6      |Heat (1995)                               |Action|Crime|Thriller             |
|9      |Sudden Death (1995)                       |Action                            |
|10     |GoldenEye (1995)                          |Action|Adventure|Thriller         |
|15     |Cutthroat Island (1995)                   |Action|Adventure|Romance          |
|20     |Money Train (1995)                        |Action|Comedy|Crime|Drama|Thriller|
|23     |Assassins (1995)                          |Action|Crime|Thriller             |
|42     |Dead Presidents (1995)                    |Action|Crime|Drama                |
|44     |Mortal Kombat (1995)                      |Action|Adventure|Fantasy          |
|66     |Lawnmower Man 2: Beyond

## **2. Show ID, title, and the number of genres of each movie, sorted by movieId**

In [83]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.appName("MovieApp").getOrCreate()

movies = spark.read.csv('movies_small.csv', header=True, inferSchema=True)

movies_with_action = movies.filter(
    f.array_contains(f.split(f.col('genres'), '\|'), 'Action')
)

result2 = movies.withColumn('count', f.size(f.split(f.col('genres'), '\|'))) \
                .select('movieId', 'title', 'count') \
                .orderBy('movieId')

result2.show(truncate=False)

+-------+-------------------------------------+-----+
|movieId|title                                |count|
+-------+-------------------------------------+-----+
|1      |Toy Story (1995)                     |5    |
|2      |Jumanji (1995)                       |3    |
|3      |Grumpier Old Men (1995)              |2    |
|4      |Waiting to Exhale (1995)             |3    |
|5      |Father of the Bride Part II (1995)   |1    |
|6      |Heat (1995)                          |3    |
|7      |Sabrina (1995)                       |2    |
|8      |Tom and Huck (1995)                  |2    |
|9      |Sudden Death (1995)                  |1    |
|10     |GoldenEye (1995)                     |3    |
|11     |American President, The (1995)       |3    |
|12     |Dracula: Dead and Loving It (1995)   |2    |
|13     |Balto (1995)                         |3    |
|14     |Nixon (1995)                         |1    |
|15     |Cutthroat Island (1995)              |3    |
|16     |Casino (1995)      

## **3. Show the number of movies of each genre, sorted by the count number**

In [84]:
result3 = movies.withColumn('genre', f.explode(f.split(f.col('genres'), '\|'))) \
                .groupBy('genre') \
                .agg(f.count('movieId').alias('num_movies')) \
                .orderBy(f.asc('num_movies'))

result3.show(truncate=False)

+------------------+----------+
|genre             |num_movies|
+------------------+----------+
|(no genres listed)|34        |
|Film-Noir         |87        |
|IMAX              |158       |
|Western           |167       |
|Musical           |334       |
|War               |382       |
|Documentary       |440       |
|Mystery           |573       |
|Animation         |611       |
|Children          |664       |
|Fantasy           |779       |
|Horror            |978       |
|Sci-Fi            |980       |
|Crime             |1199      |
|Adventure         |1263      |
|Romance           |1596      |
|Action            |1828      |
|Thriller          |1894      |
|Comedy            |3756      |
|Drama             |4361      |
+------------------+----------+



## **4. Show the list of all movies associated with each genres**

In [85]:
result4 = movies.withColumn('genre', f.explode(f.split(f.col('genres'), '\|'))) \
                .groupBy('genre') \
                .agg(f.collect_list('title').alias('movies_list')) \
                .show(truncate=100)

+------------------+----------------------------------------------------------------------------------------------------+
|             genre|                                                                                         movies_list|
+------------------+----------------------------------------------------------------------------------------------------+
|             Crime|[Heat (1995), Casino (1995), Money Train (1995), Get Shorty (1995), Copycat (1995), Assassins (19...|
|           Romance|[Grumpier Old Men (1995), Waiting to Exhale (1995), Sabrina (1995), American President, The (1995...|
|          Thriller|[Heat (1995), GoldenEye (1995), Money Train (1995), Get Shorty (1995), Copycat (1995), Assassins ...|
|         Adventure|[Toy Story (1995), Jumanji (1995), Tom and Huck (1995), GoldenEye (1995), Balto (1995), Cutthroat...|
|             Drama|[Waiting to Exhale (1995), American President, The (1995), Nixon (1995), Casino (1995), Sense and...|
|               War|[Ric

## **5. Show the years of the first_appearance of ‘Animation’ and ‘Sci-Fi’ movies**

In [86]:
movies = spark.read.csv('movies_small.csv', header=True, inferSchema=True)

movies_with_year = movies.withColumn('year', f.regexp_extract(f.col('title'), r'\((\d{4})\)', 1))

movies_exploded = movies_with_year.withColumn('genre', f.explode(f.split(f.col('genres'), '\|')))

animation_sci_fi_movies = movies_exploded.filter(f.col('genre').isin('Animation', 'Sci-Fi'))

result = animation_sci_fi_movies.groupBy('genre') \
                                .agg(f.min('year').alias('first_appearance'))

result.show(truncate=False)

spark.stop()

+---------+----------------+
|genre    |first_appearance|
+---------+----------------+
|Animation|1908            |
|Sci-Fi   |                |
+---------+----------------+



# **Bonus**

## Load file into Pyspark DF

In [110]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
from pyspark.sql import functions as f

spark = SparkSession.builder.appName("CustomerGameApp").getOrCreate()

customers = spark.read.csv('cust.txt', header=False, inferSchema=True)
customers = customers.withColumnRenamed('_c0', 'customer_id') \
                     .withColumnRenamed('_c1', 'first_name') \
                     .withColumnRenamed('_c2', 'last_name') \
                     .withColumnRenamed('_c3', 'age') \
                     .withColumnRenamed('_c4', 'occupation')

transactions = spark.read.csv('trans.txt', header=False, inferSchema=True)
transactions = transactions.withColumnRenamed('_c0', 'transaction_id') \
                           .withColumnRenamed('_c1', 'date') \
                           .withColumnRenamed('_c2', 'customer_id') \
                           .withColumnRenamed('_c3', 'amount') \
                           .withColumnRenamed('_c4', 'game_type') \
                           .withColumnRenamed('_c5', 'category') \
                           .withColumnRenamed('_c6', 'product') \
                           .withColumnRenamed('_c7', 'city') \
                           .withColumnRenamed('_c8', 'state') \
                           .withColumnRenamed('_c9', 'payment_method')

## 1. Show IDs and All Game Types Played by Customers Who Play "Water Sports"

In [111]:
trans_with_customer = transactions.join(customers, on='customer_id')

water_sports_players = trans_with_customer.filter(f.col('game_type') == 'Water Sports') \
                                          .groupBy('customer_id').agg(f.collect_set('game_type').alias('game_types'))

water_sports_players.show(truncate=False)


+-----------+--------------+
|customer_id|game_types    |
+-----------+--------------+
|4000009    |[Water Sports]|
|4000001    |[Water Sports]|
|4000006    |[Water Sports]|
|4000008    |[Water Sports]|
|4000004    |[Water Sports]|
|4000003    |[Water Sports]|
|4000002    |[Water Sports]|
+-----------+--------------+



## 2. Show IDs and Number of Transactions of Each Customer

In [112]:
transaction_count = transactions.groupBy('customer_id').agg(f.count('game_type').alias('num_transactions'))

transaction_count.show(truncate=False)

+-----------+----------------+
|customer_id|num_transactions|
+-----------+----------------+
|4000009    |6               |
|4000001    |8               |
|4000006    |5               |
|4000005    |5               |
|4000008    |10              |
|4000004    |5               |
|4000003    |3               |
|4000010    |6               |
|4000007    |6               |
|4000002    |6               |
+-----------+----------------+



## 3. Show IDs and Number of Transactions of Each Customer, Sorted by Customer ID

In [113]:
transaction_count_sorted = transaction_count.orderBy('customer_id')

transaction_count_sorted.show(truncate=False)

+-----------+----------------+
|customer_id|num_transactions|
+-----------+----------------+
|4000001    |8               |
|4000002    |6               |
|4000003    |3               |
|4000004    |5               |
|4000005    |5               |
|4000006    |5               |
|4000007    |6               |
|4000008    |10              |
|4000009    |6               |
|4000010    |6               |
+-----------+----------------+



## 4. Show IDs and Total Cost of Transactions of Each Customer, Sorted by Total Cost

In [114]:
total_cost_per_customer = transactions.groupBy('customer_id').agg(f.sum('amount').alias('total_cost'))

total_cost_sorted = total_cost_per_customer.orderBy(f.col('total_cost'), ascending=False)

total_cost_sorted.show(truncate=False)

+-----------+------------------+
|customer_id|total_cost        |
+-----------+------------------+
|4000008    |859.42            |
|4000002    |706.97            |
|4000007    |699.5500000000001 |
|4000001    |651.05            |
|4000006    |539.38            |
|4000003    |527.5899999999999 |
|4000009    |457.83            |
|4000010    |447.09000000000003|
|4000004    |337.06            |
|4000005    |325.15            |
+-----------+------------------+



## 5. Show ID, Number of Transactions, and Total Cost for Each Customer, Sorted by Customer ID

In [115]:
customer_stats = transactions.groupBy('customer_id').agg(
    f.count('game_type').alias('num_transactions'),
    f.sum('amount').alias('total_cost')
)

customer_stats_sorted = customer_stats.orderBy('customer_id')

customer_stats_sorted.show(truncate=False)

+-----------+----------------+------------------+
|customer_id|num_transactions|total_cost        |
+-----------+----------------+------------------+
|4000001    |8               |651.05            |
|4000002    |6               |706.97            |
|4000003    |3               |527.5899999999999 |
|4000004    |5               |337.06            |
|4000005    |5               |325.15            |
|4000006    |5               |539.38            |
|4000007    |6               |699.5500000000001 |
|4000008    |10              |859.42            |
|4000009    |6               |457.83            |
|4000010    |6               |447.09000000000003|
+-----------+----------------+------------------+



## 6. Show Name, Number of Transactions, and Total Cost for Each Customer, Sorted by Total Cost

In [116]:
customer_info = customer_stats.join(customers, on='customer_id')

sorted_customer_info = customer_info.orderBy(f.col('total_cost'), ascending=False)

sorted_customer_info.select('first_name', 'last_name', 'num_transactions', 'total_cost').show(truncate=False)

+----------+----------+----------------+------------------+
|first_name|last_name |num_transactions|total_cost        |
+----------+----------+----------------+------------------+
|Hazel     |Bender    |10              |859.42            |
|Paige     |Chen      |6               |706.97            |
|Elsie     |Hamilton  |6               |699.5500000000001 |
|Kristina  |Chung     |8               |651.05            |
|Patrick   |Song      |5               |539.38            |
|Sherri    |Melton    |3               |527.5899999999999 |
|Malcolm   |Wagner    |6               |457.83            |
|Dolores   |McLaughlin|6               |447.09000000000003|
|Gretchen  |Hill      |5               |337.06            |
|Karen     |Puckett   |5               |325.15            |
+----------+----------+----------------+------------------+



## 7. Show ID, Name, Game Types Played by Each Customer

In [117]:
customer_game_types = transactions.join(customers, on='customer_id') \
    .groupBy('customer_id', 'first_name', 'last_name') \
    .agg(f.collect_set('game_type').alias('game_types'))


customer_game_types.show(truncate=False)

+-----------+----------+----------+------------------------------------------------------------------------------------------------+
|customer_id|first_name|last_name |game_types                                                                                      |
+-----------+----------+----------+------------------------------------------------------------------------------------------------+
|4000002    |Paige     |Chen      |[Team Sports, Water Sports, Outdoor Recreation, Exercise & Fitness]                             |
|4000001    |Kristina  |Chung     |[Combat Sports, Water Sports, Outdoor Recreation, Gymnastics, Winter Sports, Exercise & Fitness]|
|4000008    |Hazel     |Bender    |[Team Sports, Water Sports, Outdoor Recreation, Games, Outdoor Play Equipment]                  |
|4000005    |Karen     |Puckett   |[Puzzles, Team Sports, Air Sports, Exercise & Fitness, Outdoor Play Equipment]                  |
|4000007    |Elsie     |Hamilton  |[Team Sports, Outdoor Recreation, 

## 8. Show ID, Name, Game Types of All Players Who Play 5 or More Game Types

In [118]:
players_with_5_or_more_games = customer_game_types.filter(f.size(f.col('game_types')) >= 5)

players_with_5_or_more_games.show(truncate=False)

+-----------+----------+----------+------------------------------------------------------------------------------------------------+
|customer_id|first_name|last_name |game_types                                                                                      |
+-----------+----------+----------+------------------------------------------------------------------------------------------------+
|4000001    |Kristina  |Chung     |[Combat Sports, Water Sports, Outdoor Recreation, Gymnastics, Winter Sports, Exercise & Fitness]|
|4000008    |Hazel     |Bender    |[Team Sports, Water Sports, Outdoor Recreation, Games, Outdoor Play Equipment]                  |
|4000005    |Karen     |Puckett   |[Puzzles, Team Sports, Air Sports, Exercise & Fitness, Outdoor Play Equipment]                  |
|4000010    |Dolores   |McLaughlin|[Team Sports, Jumping, Gymnastics, Games, Exercise & Fitness]                                   |
|4000009    |Malcolm   |Wagner    |[Combat Sports, Water Sports, Indo

## 9. Show Name of All Distinct Players of Each Game Type

In [119]:
distinct_players_by_game = transactions.join(customers, on='customer_id') \
    .groupBy('game_type') \
    .agg(f.collect_set('first_name').alias('players'))


distinct_players_by_game.show(truncate=False)

+----------------------+------------------------------------------------------------+
|game_type             |players                                                     |
+----------------------+------------------------------------------------------------+
|Gymnastics            |[Kristina, Dolores, Sherri, Malcolm]                        |
|Winter Sports         |[Kristina, Patrick]                                         |
|Jumping               |[Dolores, Patrick]                                          |
|Team Sports           |[Hazel, Elsie, Paige, Dolores, Karen]                       |
|Air Sports            |[Karen]                                                     |
|Indoor Games          |[Gretchen, Malcolm]                                         |
|Games                 |[Hazel, Dolores]                                            |
|Outdoor Play Equipment|[Hazel, Karen, Patrick, Malcolm]                            |
|Water Sports          |[Kristina, Hazel, Paige, Sherr

## 10. Show All Game Types Which Don’t Have Players Under 40

In [120]:
players_above_40 = transactions.join(customers, on='customer_id') \
    .filter(f.col('age') >= 40)

game_types_without_players_under_40 = players_above_40.groupBy('game_type').agg(f.collect_set('first_name').alias('players'))

game_types_without_players_under_40.show(truncate=False)


+----------------------+-------------------------------------------+
|game_type             |players                                    |
+----------------------+-------------------------------------------+
|Gymnastics            |[Kristina, Dolores]                        |
|Winter Sports         |[Kristina, Patrick]                        |
|Jumping               |[Dolores, Patrick]                         |
|Team Sports           |[Hazel, Elsie, Paige, Dolores, Karen]      |
|Air Sports            |[Karen]                                    |
|Indoor Games          |[Gretchen]                                 |
|Games                 |[Hazel, Dolores]                           |
|Outdoor Play Equipment|[Hazel, Karen, Patrick]                    |
|Water Sports          |[Kristina, Hazel, Paige, Gretchen, Patrick]|
|Puzzles               |[Karen]                                    |
|Outdoor Recreation    |[Kristina, Elsie, Hazel, Paige, Gretchen]  |
|Combat Sports         |[Kristina]

## 11. Show Min, Max, Average Age of Players of All Game Types

In [121]:
age_stats_per_game_type = transactions.join(customers, on='customer_id') \
    .groupBy('game_type') \
    .agg(
        f.min('age').alias('min_age'),
        f.max('age').alias('max_age'),
        f.avg('age').alias('avg_age')
    )

age_stats_per_game_type.show(truncate=False)

+----------------------+-------+-------+------------------+
|game_type             |min_age|max_age|avg_age           |
+----------------------+-------+-------+------------------+
|Gymnastics            |34     |60     |47.0              |
|Winter Sports         |42     |55     |48.5              |
|Jumping               |42     |60     |54.0              |
|Team Sports           |43     |74     |62.875            |
|Air Sports            |74     |74     |74.0              |
|Indoor Games          |39     |66     |48.0              |
|Games                 |60     |63     |62.0              |
|Outdoor Play Equipment|39     |74     |52.0              |
|Water Sports          |34     |74     |57.36363636363637 |
|Puzzles               |74     |74     |74.0              |
|Outdoor Recreation    |34     |74     |53.0              |
|Combat Sports         |39     |55     |47.0              |
|Exercise & Fitness    |43     |74     |62.142857142857146|
+----------------------+-------+-------+