In [1]:
try:
    sc.stop()
except:
    print("no sc to stop")

from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession, HiveContext

conf = SparkConf().setAppName('SparkHiveSession').setMaster('local[4]')
sc = SparkContext.getOrCreate(conf = conf)


                 

In [2]:
sc

In [3]:
# hive integration with spark session
spark = (SparkSession.builder.appName('pysparkhiveintegration')
        .config('spark.sql.warehouse.dir', '/user/hive/warehouse')
        .getOrCreate())

In [4]:
spark.sql('show databases').show()

+------------+
|databaseName|
+------------+
| bucketingdb|
|   complexdb|
|     default|
|  employeedb|
|  hivetestdb|
|    hw271_db|
|    hw272_db|
|    hw273_db|
|      jsondb|
|   movies_db|
| xmldatabase|
|       xmldb|
+------------+



In [5]:
spark.sql('drop database if exists movies_db cascade').show()

++
||
++
++



In [6]:
spark.sql('create database if not exists movies_db').show()

++
||
++
++



In [7]:
spark.sql('show databases').show()

+------------+
|databaseName|
+------------+
| bucketingdb|
|   complexdb|
|     default|
|  employeedb|
|  hivetestdb|
|    hw271_db|
|    hw272_db|
|    hw273_db|
|      jsondb|
|   movies_db|
| xmldatabase|
|       xmldb|
+------------+



In [8]:
spark.sql('use movies_db').show()

++
||
++
++



In [9]:
spark.sql('show tables').show()

+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+



In [10]:
spark.sql("""create table if not exists movies 
    (movieId int, 
     title string,
     genres string)
     row format delimited
     fields terminated by ','
     stored as textfile     
     tblproperties ("skip.header.line.count"="1")
    """).show()

spark.sql("""create table if not exists ratings
            (userId int, movieId int, rating float, timestamp string)
            stored as ORC
            """).show()

++
||
++
++

++
||
++
++



In [11]:
#spark.sql(""" """).show()

spark.sql("""show tables""").show()

+---------+---------+-----------+
| database|tableName|isTemporary|
+---------+---------+-----------+
|movies_db|   movies|      false|
|movies_db|  ratings|      false|
+---------+---------+-----------+



In [12]:
spark.sql("""describe formatted ratings""").show()

+--------------------+--------------------+-------+
|            col_name|           data_type|comment|
+--------------------+--------------------+-------+
|              userId|                 int|   null|
|             movieId|                 int|   null|
|              rating|               float|   null|
|           timestamp|              string|   null|
|                    |                    |       |
|# Detailed Table ...|                    |       |
|            Database|           movies_db|       |
|               Table|             ratings|       |
|               Owner|              hadoop|       |
|        Created Time|Thu Jan 21 21:14:...|       |
|         Last Access|Thu Jan 01 05:30:...|       |
|          Created By|         Spark 2.4.7|       |
|                Type|             MANAGED|       |
|            Provider|                hive|       |
|    Table Properties|[transient_lastDd...|       |
|            Location|hdfs://localhost:...|       |
|       Serd

In [13]:
spark.sql("""load data local inpath '/home/hadoop/data/ml-latest/movies.csv'
overwrite into table movies""").show()

++
||
++
++



In [14]:
spark.sql("""select * from movies limit 5""").show()


+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+



In [15]:
#### Inserting data into rating table using spark dataframe

In [16]:
from pyspark.sql.types import * # OH NO!

In [23]:
schema = StructType([
    StructField('UserId', IntegerType()),
    StructField('movieId', IntegerType()),
    StructField('rating', DoubleType()),
    StructField('timestamp', IntegerType()) ])

In [24]:
rating_df = spark.read.csv("file:///home/hadoop/data/ml-latest/ratings.csv", schema=schema, header=True)

In [25]:
rating_df.show(5)

+------+-------+------+----------+
|UserId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    307|   3.5|1256677221|
|     1|    481|   3.5|1256677456|
|     1|   1091|   1.5|1256677471|
|     1|   1257|   4.5|1256677460|
|     1|   1449|   4.5|1256677264|
+------+-------+------+----------+
only showing top 5 rows



In [26]:
rating_df.createOrReplaceTempView("rating_df_table")

In [27]:
#spark.sql(""" """).show()
spark.sql("""insert into table ratings select * from rating_df_table""").show()

++
||
++
++



In [29]:
spark.sql("""select * from ratings limit 10""").show(truncate=False)

+------+-------+------+----------+
|userId|movieId|rating|timestamp |
+------+-------+------+----------+
|51979 |5941   |2.5   |1439599249|
|51979 |5942   |3.5   |1439677197|
|51979 |5943   |2.5   |1439574637|
|51979 |5952   |4.0   |1439573853|
|51979 |5958   |3.0   |1439647221|
|51979 |5969   |1.5   |1439692109|
|51979 |5970   |3.0   |1439666647|
|51979 |5989   |4.5   |1528789425|
|51979 |5991   |3.5   |1439768072|
|51979 |5995   |3.5   |1439573967|
+------+-------+------+----------+



In [30]:
### Pyspark RDD broadcast

In [32]:
states = {"NY": "New York", 
          "CA": "California", 
          "FL": "Flo Rida",
          "TX": "Texas"}

In [33]:
broadcastStates = sc.broadcast(states)

In [34]:
broadcastStates


<pyspark.broadcast.Broadcast at 0x7f355fe1d7b8>

In [35]:
help(sc.broadcast)

Help on method broadcast in module pyspark.context:

broadcast(value) method of pyspark.context.SparkContext instance
    Broadcast a read-only variable to the cluster, returning a
    L{Broadcast<pyspark.broadcast.Broadcast>}
    object for reading it in distributed functions. The variable will
    be sent to each cluster only once.



In [37]:
data= [
    ('Rock', 'Shi', 'USA', 'CA'),
    ('Emy', 'Kay', 'USA', 'NY'),
    ('Michael', 'Rose', 'USA', 'NY'),
    ('Johnny','Cash','USA','TX'),
    ('Joe','Exotic','USA','FL'),
      ]

rdd_4 = sc.parallelize(data)

In [38]:
def state_convert(code):
    return broadcastStates.value[code]

In [44]:
rdd_4.map(lambda x: (x[0],x[1],x[2], state_convert(x[3])) ).collect()

[('Rock', 'Shi', 'USA', 'California'),
 ('Emy', 'Kay', 'USA', 'New York'),
 ('Michael', 'Rose', 'USA', 'New York'),
 ('Johnny', 'Cash', 'USA', 'Texas'),
 ('Joe', 'Exotic', 'USA', 'Flo Rida')]

In [45]:
"Broadcast variables allow to keep read only variable \
cached on each machine rather than shipping copy of it with tasks.\
To give every worker node a copy of large input dataset \
in an efficient manner.\
\
You don't broadcast RDD but values are to all executor nodes that is used multiple times while processing RDD"

'Broadcast variables allow to keep read only variable cached on each machine rather than shipping copy of it with tasks.'

In [46]:
#Pyspark repartitions

In [53]:
spark.conf.set("spark.sql.shuffle.partitions", "200")

In [55]:
rdd5 = sc.parallelize( (0,20), 6 )

print("from local[4]: " + str(rdd5.getNumPartitions()) )

from local[4]: 6


In [56]:
rdd5.saveAsTextFile("file:///home/hadoop/parts")

In [65]:
rdd6 = rdd5.coalesce(2)

In [66]:
print(rdd6.getNumPartitions())

2
