In [1]:
import findspark
findspark.init('/opt/spark')

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder\
    .enableHiveSupport()\
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [5]:
spark.sql("CREATE DATABASE emp")

DataFrame[]

In [6]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
|      emp|
+---------+



In [7]:
spark.sql("DROP DATABASE emp")

DataFrame[]

In [8]:
spark.sql("show tables").show()

[Stage 0:>                                                          (0 + 1) / 1]

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|      src|      false|
+---------+---------+-----------+



                                                                                

In [9]:
spark.sql("use default")

DataFrame[]

In [10]:
spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|      src|      false|
+---------+---------+-----------+



In [11]:
spark.sql("select * from src").show()

[Stage 2:>                                                          (0 + 1) / 1]

+---+-------+
|key|  value|
+---+-------+
|238|val_238|
| 86| val_86|
|311|val_311|
| 27| val_27|
|165|val_165|
|409|val_409|
|255|val_255|
|278|val_278|
| 98| val_98|
|484|val_484|
|265|val_265|
|193|val_193|
|401|val_401|
|150|val_150|
|273|val_273|
|224|val_224|
|369|val_369|
| 66| val_66|
|128|val_128|
|213|val_213|
+---+-------+
only showing top 20 rows



                                                                                

In [None]:
! wget http://files.grouplens.org/datasets/movielens/ml-latest.zip

In [13]:
spark.sql('create database movies')

AnalysisException: [SCHEMA_ALREADY_EXISTS] Cannot create schema `movies` because it already exists.
Choose a different name, drop the existing schema, or add the IF NOT EXISTS clause to tolerate pre-existing schema.

In [15]:
spark.sql("SHOW DATABASES").show()

+---------+
|namespace|
+---------+
|  default|
|   movies|
+---------+



In [16]:
spark.sql('use movies')

DataFrame[]

In [17]:
spark.sql('create table IF NOT EXISTS movies \
         (movieId int,title string,genres string) \
         row format delimited fields terminated by ","\
         stored as textfile')                                              # in textfile format
spark.sql("create table IF NOT EXISTS ratings\
           (userId int,movieId int,rating float,timestamp string)\
           stored as ORC" )                                                # in ORC format

DataFrame[]

In [18]:
spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|   movies|   movies|      false|
|   movies|  ratings|      false|
+---------+---------+-----------+



In [19]:
spark.sql("create table IF NOT EXISTS genres_by_count\
           (genres string,count int)")

24/02/28 23:56:40 WARN ResolveSessionCatalog: A Hive serde table will be created as there is no table provider specified. You can set spark.sql.legacy.createHiveTableByDefault to false so that native data source table will be created instead.


DataFrame[]

In [20]:
spark.sql("show tables").show()

+---------+---------------+-----------+
|namespace|      tableName|isTemporary|
+---------+---------------+-----------+
|   movies|genres_by_count|      false|
|   movies|         movies|      false|
|   movies|        ratings|      false|
+---------+---------------+-----------+



In [21]:
spark.sql("describe formatted ratings").show(truncate = False)

+----------------------------+---------------------------------------------------+-------+
|col_name                    |data_type                                          |comment|
+----------------------------+---------------------------------------------------+-------+
|userId                      |int                                                |null   |
|movieId                     |int                                                |null   |
|rating                      |float                                              |null   |
|timestamp                   |string                                             |null   |
|                            |                                                   |       |
|# Detailed Table Information|                                                   |       |
|Catalog                     |spark_catalog                                      |       |
|Database                    |movies                                             |       |

In [22]:
spark.sql("load data local inpath './ml-latest/movies.csv'\
                 overwrite into table movies")

DataFrame[]

In [23]:
from pyspark.sql.types import *
schema = StructType([
             StructField('userId', IntegerType()),
             StructField('movieId', IntegerType()),
             StructField('rating', DoubleType()),
             StructField('timestamp', StringType())
            ])

In [24]:
ratings_df = spark.read.csv("/user/jupyter/data/ratings.csv", schema = schema, header = True)

In [25]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)



In [26]:
ratings_df.show(5)

[Stage 7:>                                                          (0 + 1) / 1]

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|      1|   4.0|1225734739|
|     1|    110|   4.0|1225865086|
|     1|    158|   4.0|1225733503|
|     1|    260|   4.5|1225735204|
|     1|    356|   5.0|1225735119|
+------+-------+------+----------+
only showing top 5 rows



                                                                                

In [27]:
ratings_df.createOrReplaceTempView("ratings_df_table") 

In [28]:
spark.sql("insert into table ratings select * from ratings_df_table")

                                                                                

DataFrame[]

In [29]:
spark.sql("select * from movies limit 10").show(truncate = False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|null   |title                             |genres                                     |
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
|6      |Heat (1995)                       |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                    |Comedy|Romance                             |
|8      |Tom and Huck

In [30]:
spark.sql("select * from ratings limit 10").show(truncate = False)

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|48744 |54503  |3.5   |1477820783|
|48744 |54995  |4.0   |1478372524|
|48744 |57669  |4.0   |1477820784|
|48744 |60126  |3.0   |1478372854|
|48744 |60756  |2.5   |1478450823|
|48744 |61024  |3.5   |1478372866|
|48744 |61132  |4.0   |1478372465|
|48744 |63189  |3.0   |1478371836|
|48744 |68952  |4.0   |1478373478|
|48744 |69945  |0.5   |1478450809|
+------+-------+------+----------+



In [31]:
spark.sql("select genres, count(*) as count from movies\
          group by genres\
          having count(*) > 500 \
          order by count desc").show()



+--------------------+-----+
|              genres|count|
+--------------------+-----+
|               Drama|10902|
|         Documentary| 7493|
|              Comedy| 6988|
|  (no genres listed)| 6843|
|        Comedy|Drama| 2855|
|       Drama|Romance| 2504|
|              Horror| 2318|
|      Comedy|Romance| 1976|
|            Thriller| 1318|
|Comedy|Drama|Romance| 1158|
|     Horror|Thriller| 1138|
|      Drama|Thriller| 1133|
|           Animation| 1085|
|         Crime|Drama| 1031|
|              Action|  745|
|           Drama|War|  698|
|        Action|Drama|  654|
|             Western|  608|
|             Romance|  587|
|Crime|Drama|Thriller|  581|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [32]:
spark.sql("insert into table genres_by_count \
          select genres, count(*) as count from movies\
          group by genres\
          having count(*) >= 500 \
          order by count desc")

DataFrame[]

In [33]:
spark.sql("select * from genres_by_count order by count desc limit 3").show()

+-----------+-----+
|     genres|count|
+-----------+-----+
|      Drama|10902|
|Documentary| 7493|
|     Comedy| 6988|
+-----------+-----+



In [34]:
schema = StructType([
             StructField('userId', IntegerType()),
             StructField('movieId', IntegerType()),
             StructField('tag', StringType()),
             StructField('timestamp', StringType())
            ])

tags_df = spark.read.csv("/user/jupyter/data/tags.csv", schema = schema, header = True)
tags_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: string (nullable = true)



In [35]:
tags_df.registerTempTable('tags_df_table')



In [36]:
spark.sql('show tables').show()

+---------+----------------+-----------+
|namespace|       tableName|isTemporary|
+---------+----------------+-----------+
|   movies| genres_by_count|      false|
|   movies|          movies|      false|
|   movies|         ratings|      false|
|         |ratings_df_table|       true|
|         |   tags_df_table|       true|
+---------+----------------+-----------+



In [37]:
joined = spark.sql("select m.title, m.genres, r.movieId, r.userId,  r.rating, r.timestamp as ratingTimestamp, \
               t.tag, t.timestamp as tagTimestamp from ratings as r inner join tags_df_table as t\
               on r.movieId = t.movieId and r.userId = t.userId inner join movies as m on r.movieId = m.movieId")
type(joined)



pyspark.sql.dataframe.DataFrame

In [38]:
joined.select(['title','genres','rating']).show(5, truncate = False)

[Stage 30:>                                                         (0 + 1) / 1]

+----------------+-------------------------------------------+------+
|title           |genres                                     |rating|
+----------------+-------------------------------------------+------+
|Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|4.0   |
|Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|4.0   |
|Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|4.0   |
|Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|4.0   |
|Toy Story (1995)|Adventure|Animation|Children|Comedy|Fantasy|4.0   |
+----------------+-------------------------------------------+------+
only showing top 5 rows



                                                                                

In [39]:
spark.stop()