In [1]:
pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 71kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 42.6MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=bd53a8d6cf86a273977dfdd8aabb727ffcbfa08d3e22ac776be59f550c8011e1
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1


we will see how to use Spark with Hive, particularly:

– how to create and use Hive databases

– how to create Hive tables

– how to load data to Hive tables

– how to insert data into Hive tables

– how to read data from Hive tables



In [3]:
import os
os.listdir(os.getcwd())

['.config', 'sample_data']

In [6]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()

In [7]:
os.listdir(os.getcwd())

['.config', 'sample_data']

In [8]:
spark.sql('show databases').show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [9]:
spark.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



We can see the functions in Spark.SQL using the command below.
 At the time of this writing, we have about following functions.

In [10]:
fncs =  spark.sql('show functions').collect()
len(fncs)

351

In [11]:
for i in fncs[100:111]:
    print(i[0])

date_part
date_sub
date_trunc
datediff
day
dayofmonth
dayofweek
dayofyear
decimal
decode
degrees


We can see what a function is used for and what the arguments are as below.

In [12]:
spark.sql("describe function instr").show(truncate = False)

+-----------------------------------------------------------------------------------------------------+
|function_desc                                                                                        |
+-----------------------------------------------------------------------------------------------------+
|Function: instr                                                                                      |
|Class: org.apache.spark.sql.catalyst.expressions.StringInstr                                         |
|Usage: instr(str, substr) - Returns the (1-based) index of the first occurrence of `substr` in `str`.|
+-----------------------------------------------------------------------------------------------------+



In [13]:
spark.sql('create database movies')

DataFrame[]

In [14]:
spark.sql('show databases').show()

+---------+
|namespace|
+---------+
|  default|
|   movies|
+---------+



In [15]:
! wget http://files.grouplens.org/datasets/movielens/ml-latest.zip

--2021-05-02 15:34:28--  http://files.grouplens.org/datasets/movielens/ml-latest.zip
Resolving files.grouplens.org (files.grouplens.org)... 128.101.65.152
Connecting to files.grouplens.org (files.grouplens.org)|128.101.65.152|:80... connected.
HTTP request sent, awaiting response... 200 OK
Length: 277113433 (264M) [application/zip]
Saving to: ‘ml-latest.zip’


2021-05-02 15:34:32 (73.4 MB/s) - ‘ml-latest.zip’ saved [277113433/277113433]



In [None]:
!unzip /content/ml-latest.zip

In [16]:
spark.sql('use movies')

DataFrame[]

The movies dataset has movieId, title and genres fields. 

The rating dataset, on the other hand, as userId, movieID, rating and timestamp fields. 

Now, let’s create the tables.

Please refer to the Hive manual for details on how to create tables and load/insert data into the tables.

In [17]:
spark.sql('create table movies \
         (movieId int,title string,genres string) \
         row format delimited fields terminated by ","\
         stored as textfile')                                              # in textfile format

spark.sql("create table ratings\
           (userId int,movieId int,rating float,timestamp string)\
           stored as ORC" )                                                # in ORC format

DataFrame[]

Let’s create another table in AVRO format. 

We will insert count of movies by generes into it later.

In [18]:
spark.sql("create table genres_by_count\
           ( genres string,count int)\
           stored as AVRO" )                                               # in AVRO format

DataFrame[]

In [19]:
spark.sql("show tables").show()

+--------+---------------+-----------+
|database|      tableName|isTemporary|
+--------+---------------+-----------+
|  movies|genres_by_count|      false|
|  movies|         movies|      false|
|  movies|        ratings|      false|
+--------+---------------+-----------+



In [20]:
spark.sql("describe formatted ratings").show(truncate = False)

+----------------------------+------------------------------------------------+-------+
|col_name                    |data_type                                       |comment|
+----------------------------+------------------------------------------------+-------+
|userId                      |int                                             |null   |
|movieId                     |int                                             |null   |
|rating                      |float                                           |null   |
|timestamp                   |string                                          |null   |
|                            |                                                |       |
|# Detailed Table Information|                                                |       |
|Database                    |movies                                          |       |
|Table                       |ratings                                         |       |
|Owner                       |ro

In [22]:
spark.sql("load data local inpath '/content/ml-latest.zip' overwrite into table movies")

DataFrame[]

In [23]:
from pyspark.sql.types import *
schema = StructType([
             StructField('userId', IntegerType()),
             StructField('movieId', IntegerType()),
             StructField('rating', DoubleType()),
             StructField('timestamp', StringType())
            ])

In [36]:
ratings_df = spark.read.csv("/content/ml-latest/ratings.csv", schema = schema, header = True)

In [37]:
ratings_df.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows



In [38]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)



The second option to create a data frame is to read it in as RDD and change it to data frame by using the toDF data frame function or createDataFrame from SparkSession. Remember, we have to use the Row function from pyspark.sql to use toDF.

In [39]:
from pyspark.sql import Row
from pyspark import SparkContext, SparkConf

conf = SparkConf().setMaster("local[*]")
sc = SparkContext.getOrCreate(conf)

rdd = sc.textFile("/content/ml-latest/ratings.csv")
header = rdd.first()
ratings_df2 = rdd.filter(lambda line: line != header).map(lambda line: Row(userId = int(line.split(",")[0]),
                                                                     movieId = int(line.split(",")[1]),
                                                                     rating = float(line.split(",")[2]),
                                                                     timestamp = line.split(",")[3]
                                                                    )).toDF()

In [40]:
# another way
rdd2 = rdd.filter(lambda line: line != header).map(lambda line:line.split(","))
ratings_df2_b =spark.createDataFrame(rdd2, schema = schema)

In [41]:
ratings_df2.printSchema()

root
 |-- userId: long (nullable = true)
 |-- movieId: long (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: string (nullable = true)



In [42]:
ratings_df2.show(5)

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows



In [44]:
spark.sql("load data local inpath '/content/ml-latest/movies.csv' overwrite into table movies")

DataFrame[]

In [46]:
spark.sql("select * from movies limit 10").show(truncate = False)

+-------+----------------------------------+-------------------------------------------+
|movieId|title                             |genres                                     |
+-------+----------------------------------+-------------------------------------------+
|null   |title                             |genres                                     |
|1      |Toy Story (1995)                  |Adventure|Animation|Children|Comedy|Fantasy|
|2      |Jumanji (1995)                    |Adventure|Children|Fantasy                 |
|3      |Grumpier Old Men (1995)           |Comedy|Romance                             |
|4      |Waiting to Exhale (1995)          |Comedy|Drama|Romance                       |
|5      |Father of the Bride Part II (1995)|Comedy                                     |
|6      |Heat (1995)                       |Action|Crime|Thriller                      |
|7      |Sabrina (1995)                    |Comedy|Romance                             |
|8      |Tom and Huck

In [52]:
spark.sql("select genres, count(*) as count from movies\
          group by genres\
          having count(*) > 500 \
          order by count desc").show()

+--------------------+-----+
|              genres|count|
+--------------------+-----+
|               Drama| 7069|
|              Comedy| 4735|
|  (no genres listed)| 4135|
|         Documentary| 3777|
|        Comedy|Drama| 1879|
|       Drama|Romance| 1754|
|      Comedy|Romance| 1323|
|              Horror| 1308|
|Comedy|Drama|Romance|  856|
|      Drama|Thriller|  736|
|         Crime|Drama|  734|
|            Thriller|  732|
|     Horror|Thriller|  692|
|           Animation|  595|
|           Drama|War|  519|
+--------------------+-----+



In [53]:
spark.sql("insert into table genres_by_count \
          select genres, count(*) as count from movies\
          group by genres\
          having count(*) >= 500 \
          order by count desc")

DataFrame[]

In [54]:
spark.sql("select * from genres_by_count order by count desc limit 3").show()

+------------------+-----+
|            genres|count|
+------------------+-----+
|             Drama| 7069|
|            Comedy| 4735|
|(no genres listed)| 4135|
+------------------+-----+



In [55]:
schema = StructType([
             StructField('userId', IntegerType()),
             StructField('movieId', IntegerType()),
             StructField('tag', StringType()),
             StructField('timestamp', StringType())
            ])

tags_df = spark.read.csv("/content/ml-latest/tags.csv", schema = schema, header = True)
tags_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- tag: string (nullable = true)
 |-- timestamp: string (nullable = true)



Next, register the dataframe as temporary table.

In [56]:
tags_df.registerTempTable('tags_df_table')

In [57]:
spark.sql('show tables').show()

+--------+---------------+-----------+
|database|      tableName|isTemporary|
+--------+---------------+-----------+
|  movies|genres_by_count|      false|
|  movies|         movies|      false|
|  movies|        ratings|      false|
|        |  tags_df_table|       true|
+--------+---------------+-----------+

