In [7]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

conf = SparkConf().setAppName("spark_join")\
                  .setMaster("spark://workspace:7077")
sparkContext = SparkContext(conf=conf)
spark = SparkSession(sparkContext=sparkContext)

In [2]:
csv_dir = "file:///home/hooniegit/git/study/rdd-manufacture/data/movielens/ml-latest"
genome_scores = spark.read.csv(f"{csv_dir}/genome-scores.csv", header=True, inferSchema=True)
genome_tags = spark.read.csv(f"{csv_dir}/genome-tags.csv", header=True, inferSchema=True)
links = spark.read.csv(f"{csv_dir}/links.csv", header=True, inferSchema=True)
movies = spark.read.csv(f"{csv_dir}/movies.csv", header=True, inferSchema=True)
ratings = spark.read.csv(f"{csv_dir}/ratings.csv", header=True, inferSchema=True)
tags = spark.read.csv(f"{csv_dir}/tags.csv", header=True, inferSchema=True)

genome_scores.createOrReplaceTempView("genome_scores")
genome_tags.createOrReplaceTempView("genome_tags")
links.createOrReplaceTempView("links")
movies.createOrReplaceTempView("movies")
ratings.createOrReplaceTempView("ratings")
tags.createOrReplaceTempView("tags")

                                                                                

In [3]:
df_1 = spark.sql(
    """
        SELECT
            *
        FROM
            movies m
        INNER JOIN
            ratings r
        ON
            m.movieId = r.movieId
        ORDER BY
            m.movieId DESC  
    """
)
df_1.show()



+-------+--------------------+--------------------+------+-------+------+----------+
|movieId|               title|              genres|userId|movieId|rating| timestamp|
+-------+--------------------+--------------------+------+-------+------+----------+
| 288983|UNZIPPED: An Auto...|         Documentary|254114| 288983|   3.0|1689834886|
| 288977|Skinford: Death S...|      Crime|Thriller|291389| 288977|   3.0|1689815902|
| 288975|The Men Who Made ...|         Documentary|154483| 288975|   4.0|1689812351|
| 288971|  Ouija Japan (2021)|       Action|Horror| 98408| 288971|   0.5|1689798322|
| 288967|State of Siege: T...|        Action|Drama| 47791| 288967|   3.5|1689748357|
| 288965|     Камертон (1979)|             Romance|167321| 288965|   2.5|1689747309|
| 288959|Letters Of Happin...|      Children|Drama|180878| 288959|   2.0|1689723318|
| 288957|Ballet Of Blood (...|              Horror| 11969| 288957|   1.0|1689719936|
| 288955|Agata's Friends (...|               Drama|308174| 288955

                                                                                

In [4]:
df_2 = spark.sql(
    """
        SELECT DISTINCT
            AVG(r.rating) average,
            r.userId,
            YEAR(FROM_UNIXTIME(r.timestamp)) year,
            MONTH(FROM_UNIXTIME(r.timestamp)) month,
            COUNT(*) count
        FROM
            movies m
        INNER JOIN
            ratings r
        ON
            m.movieId = r.movieId
        GROUP BY
            year,
            month,
            r.userId
        ORDER BY
            year DESC,
            month DESC,
            r.userId
    """
)
df_2.show()



+------------------+------+----+-----+-----+
|           average|userId|year|month|count|
+------------------+------+----+-----+-----+
|               4.0|    33|2023|    7|    3|
|              4.75|   203|2023|    7|    4|
|               4.5|   237|2023|    7|    3|
|               4.0|   301|2023|    7|    3|
|              3.25|   304|2023|    7|   70|
|               4.0|   498|2023|    7|    3|
|              4.25|   536|2023|    7|    2|
|3.3423423423423424|   705|2023|    7|  111|
|               4.0|   720|2023|    7|    1|
|3.1666666666666665|   897|2023|    7|    3|
|               4.0|  1811|2023|    7|    2|
|  4.43859649122807|  2038|2023|    7|   57|
|            3.8125|  2140|2023|    7|    8|
|3.5833333333333335|  2172|2023|    7|    6|
|               3.8|  2270|2023|    7|    5|
|               3.5|  2318|2023|    7|    1|
| 4.142857142857143|  2414|2023|    7|    7|
|               3.5|  2589|2023|    7|    3|
|              3.25|  2651|2023|    7|    6|
|         

                                                                                

23/12/21 18:29:25 WARN StandaloneAppClient$ClientEndpoint: Connection to workspace:7077 failed; waiting for master to reconnect...
23/12/21 18:29:25 WARN StandaloneSchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...
23/12/21 18:29:25 WARN StandaloneAppClient$ClientEndpoint: Connection to workspace:7077 failed; waiting for master to reconnect...
23/12/21 18:29:30 ERROR TaskSchedulerImpl: Lost executor 0 on 220.118.158.128: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/12/21 18:29:30 ERROR TaskSchedulerImpl: Lost executor 9 on 220.118.158.128: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs for WARN messages.
23/12/21 18:29:30 ERROR TaskSchedulerImpl: Lost executor 8 on 220.118.158.128: Remote RPC client disassociated. Likely due to containers exceeding thresholds, or network issues. Check driver logs 

In [23]:
df_3 = spark.sql(
    """
        SELECT DISTINCT
            AVG(r.rating) average,
            r.userId,
            YEAR(FROM_UNIXTIME(r.timestamp)) year,
            MONTH(FROM_UNIXTIME(r.timestamp)) month,
            COUNT(*) count
        FROM
            movies m, ratings r
        WHERE
            m.movieId = r.movieId
        GROUP BY
            year,
            month,
            r.userId
        ORDER BY
            year DESC,
            month DESC,
            r.userId
    """
)
df_3.show()



+------------------+------+----+-----+-----+
|           average|userId|year|month|count|
+------------------+------+----+-----+-----+
|               4.0|    33|2023|    7|    3|
|              4.75|   203|2023|    7|    4|
|               4.5|   237|2023|    7|    3|
|               4.0|   301|2023|    7|    3|
|              3.25|   304|2023|    7|   70|
|               4.0|   498|2023|    7|    3|
|              4.25|   536|2023|    7|    2|
|3.3423423423423424|   705|2023|    7|  111|
|               4.0|   720|2023|    7|    1|
|3.1666666666666665|   897|2023|    7|    3|
|               4.0|  1811|2023|    7|    2|
|  4.43859649122807|  2038|2023|    7|   57|
|            3.8125|  2140|2023|    7|    8|
|3.5833333333333335|  2172|2023|    7|    6|
|               3.8|  2270|2023|    7|    5|
|               3.5|  2318|2023|    7|    1|
| 4.142857142857143|  2414|2023|    7|    7|
|               3.5|  2589|2023|    7|    3|
|              3.25|  2651|2023|    7|    6|
|         

                                                                                

In [24]:
df_4 = spark.sql(
    """
        SELECT DISTINCT
            *
        FROM
            movies m
        INNER JOIN
            genome_scores gs
        ON
            m.movieId = gs.movieId
        INNER JOIN
            genome_tags gt
        ON
            gs.tagId = gt.tagId
    """
)
df_4.show()

[Stage 44:>                                                         (0 + 1) / 1]

+-------+--------------------+--------------------+-------+-----+--------------------+-----+--------------------+
|movieId|               title|              genres|movieId|tagId|           relevance|tagId|                 tag|
+-------+--------------------+--------------------+-------+-----+--------------------+-----+--------------------+
|    967|  Outlaw, The (1943)|             Western|    967|  727| 0.19374999999999998|  727|nudity (full fron...|
|    968|Night of the Livi...|Horror|Sci-Fi|Thr...|    968|   25|              0.1895|   25|           addiction|
|    968|Night of the Livi...|Horror|Sci-Fi|Thr...|    968|   65| 0.02200000000000002|   65|               anime|
|    968|Night of the Livi...|Horror|Sci-Fi|Thr...|    968|   79|               0.272|   79|            artistic|
|    968|Night of the Livi...|Horror|Sci-Fi|Thr...|    968|  264| 0.04949999999999999|  264|       crappy sequel|
|    968|Night of the Livi...|Horror|Sci-Fi|Thr...|    968|  352|0.023749999999999993|  

                                                                                

In [30]:
df_4_2 = spark.sql(
    """
        SELECT DISTINCT
            m.movieId, 
            m.title, 
            m.genres, 
            gs.tagId, 
            gs.relevance, 
            gt.tag
        FROM
            movies m
        INNER JOIN
            genome_scores gs
        ON
            m.movieId = gs.movieId
        INNER JOIN
            genome_tags gt
        ON
            gs.tagId = gt.tagId
        ORDER BY
            movieId DESC,
            tagId DESC
    """
)
df_4_2.show()



+-------+-------------------+---------------+-----+--------------------+--------------+
|movieId|              title|         genres|tagId|           relevance|           tag|
+-------+-------------------+---------------+-----+--------------------+--------------+
| 288167|Extraction 2 (2023)|Action|Thriller| 1128|             0.03025|       zombies|
| 288167|Extraction 2 (2023)|Action|Thriller| 1127| 0.11225000000000002|        zombie|
| 288167|Extraction 2 (2023)|Action|Thriller| 1126|0.022749999999999992|          wwii|
| 288167|Extraction 2 (2023)|Action|Thriller| 1125|0.029500000000000026|         wuxia|
| 288167|Extraction 2 (2023)|Action|Thriller| 1124|             0.09875|       writing|
| 288167|Extraction 2 (2023)|Action|Thriller| 1123| 0.11449999999999999|       writers|
| 288167|Extraction 2 (2023)|Action|Thriller| 1122|0.027500000000000024| writer's life|
| 288167|Extraction 2 (2023)|Action|Thriller| 1121|0.026249999999999996|  world war ii|
| 288167|Extraction 2 (2023)|Act

                                                                                

In [31]:
df_4_3 = spark.sql(
    """
        SELECT DISTINCT
            m.movieId,
            m.title,
            m.genres,
            g.tagId,
            g.relevance,
            g.tag
        FROM
            movies m
        INNER JOIN
            (
                SELECT
                    gs.movieId,
                    gs.tagId,
                    gs.relevance,
                    gt.tag
                FROM
                    genome_scores gs
                INNER JOIN
                    genome_tags gt
                ON
                    gs.tagId = gt.tagId
            ) g
        ON
            m.movieId = g.movieId
        ORDER BY
            movieId DESC,
            tagId DESC
    """
)
df_4_3.show()



+-------+-------------------+---------------+-----+--------------------+--------------+
|movieId|              title|         genres|tagId|           relevance|           tag|
+-------+-------------------+---------------+-----+--------------------+--------------+
| 288167|Extraction 2 (2023)|Action|Thriller| 1128|             0.03025|       zombies|
| 288167|Extraction 2 (2023)|Action|Thriller| 1127| 0.11225000000000002|        zombie|
| 288167|Extraction 2 (2023)|Action|Thriller| 1126|0.022749999999999992|          wwii|
| 288167|Extraction 2 (2023)|Action|Thriller| 1125|0.029500000000000026|         wuxia|
| 288167|Extraction 2 (2023)|Action|Thriller| 1124|             0.09875|       writing|
| 288167|Extraction 2 (2023)|Action|Thriller| 1123| 0.11449999999999999|       writers|
| 288167|Extraction 2 (2023)|Action|Thriller| 1122|0.027500000000000024| writer's life|
| 288167|Extraction 2 (2023)|Action|Thriller| 1121|0.026249999999999996|  world war ii|
| 288167|Extraction 2 (2023)|Act

                                                                                

In [38]:
df_all = spark.sql(
    """
        SELECT
            m.movieId,
            m.title,
            m.genres,
            l.imdbId,
            l.tmdbId,
            r.userId,
            r.rating,
            r.timestamp,
            gs.relevance,
            gs.tagId,
            gt.tag
        FROM
            movies m
        LEFT JOIN
            links l
        ON
            m.movieId = l.movieId
        LEFT JOIN
            ratings r
        ON
            m.movieId = r.movieId
        LEFT JOIN
            genome_scores gs
        ON
            m.movieId = gs.movieId
        LEFT JOIN
            genome_tags gt
        ON
            gs.tagId = gt.tagId
    """
)
df_all.show()



+-------+--------------------+-------------+------+------+------+------+----------+--------------------+-----+---------------+
|movieId|               title|       genres|imdbId|tmdbId|userId|rating| timestamp|           relevance|tagId|            tag|
+-------+--------------------+-------------+------+------+------+------+----------+--------------------+-----+---------------+
|     12|Dracula: Dead and...|Comedy|Horror|112896| 12110| 34486|   3.5|1494535157|0.028249999999999997|    1|            007|
|     12|Dracula: Dead and...|Comedy|Horror|112896| 12110| 34486|   3.5|1494535157|             0.03075|    2|   007 (series)|
|     12|Dracula: Dead and...|Comedy|Horror|112896| 12110| 34486|   3.5|1494535157|0.025500000000000023|    3|   18th century|
|     12|Dracula: Dead and...|Comedy|Horror|112896| 12110| 34486|   3.5|1494535157|0.028000000000000025|    4|          1920s|
|     12|Dracula: Dead and...|Comedy|Horror|112896| 12110| 34486|   3.5|1494535157| 0.26475000000000004|    5| 

                                                                                

In [6]:
spark.stop()