In [1]:
import findspark
findspark.init('/usr/local/spark')
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder.appName("Python Hive Connect Example") \
        .config("spark.sql.warehouse.dir","hdfs://localhost:54310/user/hive/warehouse") \
        .enableHiveSupport() \
        .getOrCreate()

In [4]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("Python RDD to DF").getOrCreate()

# DATAFRAME AND ITS OPERATIONS

In [5]:
customerdf=spark.read.csv('/home/hduser/Downloads/sharedfolder/customers.txt',sep="\t", inferSchema="True",header=True)

In [6]:
customerdf.printSchema()

root
 |-- customer_id: integer (nullable = true)
 |-- customer_name: string (nullable = true)
 |-- customer_city: string (nullable = true)
 |-- customer_state: string (nullable = true)
 |-- customer_zipcode: integer (nullable = true)



In [7]:
customerdf.show(5)

+-----------+--------------+-------------+--------------+----------------+
|customer_id| customer_name|customer_city|customer_state|customer_zipcode|
+-----------+--------------+-------------+--------------+----------------+
|      11039|   Mary Torres|       Caguas|            PR|             725|
|       5623|    Jose Haley|     Columbus|            OH|           43207|
|       5829|    Mary Smith|      Houston|            TX|           77015|
|       6336|Richard Maddox|       Caguas|            PR|             725|
|       1708|Margaret Booth|    Arlington|            TX|           76010|
+-----------+--------------+-------------+--------------+----------------+
only showing top 5 rows



In [8]:
# select command is used to select one or more column. 
# we cannot select like customerdf[column_name] : not workinh here ..so select fn is must.
customerdf.select('customer_name').show(5)

+--------------+
| customer_name|
+--------------+
|   Mary Torres|
|    Jose Haley|
|    Mary Smith|
|Richard Maddox|
|Margaret Booth|
+--------------+
only showing top 5 rows



In [9]:
# 2nd way to read /select the column 
customerdf.select(customerdf['customer_name'],customerdf["customer_city"]).show(5)

+--------------+-------------+
| customer_name|customer_city|
+--------------+-------------+
|   Mary Torres|       Caguas|
|    Jose Haley|     Columbus|
|    Mary Smith|      Houston|
|Richard Maddox|       Caguas|
|Margaret Booth|    Arlington|
+--------------+-------------+
only showing top 5 rows



In [10]:
customerdf.select('customer_name','customer_city').show(5)

+--------------+-------------+
| customer_name|customer_city|
+--------------+-------------+
|   Mary Torres|       Caguas|
|    Jose Haley|     Columbus|
|    Mary Smith|      Houston|
|Richard Maddox|       Caguas|
|Margaret Booth|    Arlington|
+--------------+-------------+
only showing top 5 rows



In [11]:
# return the column
customerdf.columns

['customer_id',
 'customer_name',
 'customer_city',
 'customer_state',
 'customer_zipcode']

In [12]:
# filter 
customerdf.filter((customerdf['customer_name']=="Mary Smith") & (customerdf['customer_city']=='Houston')).show()

+-----------+-------------+-------------+--------------+----------------+
|customer_id|customer_name|customer_city|customer_state|customer_zipcode|
+-----------+-------------+-------------+--------------+----------------+
|       5829|   Mary Smith|      Houston|            TX|           77015|
|       5987|   Mary Smith|      Houston|            TX|           77015|
+-----------+-------------+-------------+--------------+----------------+



In [13]:
# Filter using dot operator with and conditon 
customerdf.filter((customerdf.customer_name=="Mary Smith") & (customerdf.customer_city=="Houston")).show()

+-----------+-------------+-------------+--------------+----------------+
|customer_id|customer_name|customer_city|customer_state|customer_zipcode|
+-----------+-------------+-------------+--------------+----------------+
|       5829|   Mary Smith|      Houston|            TX|           77015|
|       5987|   Mary Smith|      Houston|            TX|           77015|
+-----------+-------------+-------------+--------------+----------------+



In [14]:
# group by
cust_df=customerdf.groupBy('customer_name')

In [15]:
cust_df.max('customer_zipcode').show()

+--------------+---------------------+
| customer_name|max(customer_zipcode)|
+--------------+---------------------+
| Theresa Lopez|                91306|
|   Mary Zamora|                  725|
|    Mary Moore|                95023|
| Mary Williams|                80020|
|  Mary Donovan|                79109|
| Michael Smith|                90002|
|  Mary Leonard|                  725|
|  Ashley Clark|                  725|
|  Mary Barrett|                90262|
| Susan Watkins|                85225|
|Samuel Swanson|                  725|
|    Mary Hardy|                  725|
| Kathryn Smith|                78227|
|  Mary Harding|                  725|
|Margaret Grant|                  949|
|     Mary Sims|                44035|
|Cynthia Spence|                92105|
|   Bruce Smith|                  725|
|   Mary Dawson|                  725|
|  Olivia Smith|                48212|
+--------------+---------------------+
only showing top 20 rows



In [16]:
# count of each name
df=cust_df.count()

In [17]:
df.show()

+--------------+-----+
| customer_name|count|
+--------------+-----+
| Theresa Lopez|    1|
|   Mary Zamora|    1|
|    Mary Moore|    1|
| Mary Williams|    2|
|  Mary Donovan|    1|
| Michael Smith|    6|
|  Mary Leonard|    1|
|  Ashley Clark|    1|
|  Mary Barrett|    1|
| Susan Watkins|    1|
|Samuel Swanson|    1|
|    Mary Hardy|    1|
| Kathryn Smith|    1|
|  Mary Harding|    1|
|Margaret Grant|    1|
|     Mary Sims|    2|
|Cynthia Spence|    1|
|   Bruce Smith|    1|
|   Mary Dawson|    1|
|  Olivia Smith|    2|
+--------------+-----+
only showing top 20 rows



In [18]:
# sum
cust_df.sum().show()

+--------------+----------------+---------------------+
| customer_name|sum(customer_id)|sum(customer_zipcode)|
+--------------+----------------+---------------------+
| Theresa Lopez|            1295|                91306|
|   Mary Zamora|           11019|                  725|
|    Mary Moore|            6027|                95023|
| Mary Williams|            3325|                88721|
|  Mary Donovan|            8802|                79109|
| Michael Smith|           44343|               257117|
|  Mary Leonard|            9440|                  725|
|  Ashley Clark|             236|                  725|
|  Mary Barrett|            3148|                90262|
| Susan Watkins|            5253|                85225|
|Samuel Swanson|           11980|                  725|
|    Mary Hardy|            2638|                  725|
| Kathryn Smith|            1929|                78227|
|  Mary Harding|            6052|                  725|
|Margaret Grant|            5188|               

In [19]:
# return the path of current file
!pwd

/home/hduser/Downloads


In [20]:
# Before applying the filter on count we need to save count data into one variable then we can apply  filter on that
df.filter(df["count"]>50).show()

+-------------+-----+
|customer_name|count|
+-------------+-----+
|   Mary Smith|  175|
+-------------+-----+



In [21]:
# To apply SQL query on dataframe we created the customer view
customerdf.createOrReplaceTempView("customer")

In [22]:
# To use SQL query we need to use sql.spark
spark.sql("select * from customer").show()

+-----------+----------------+-------------+--------------+----------------+
|customer_id|   customer_name|customer_city|customer_state|customer_zipcode|
+-----------+----------------+-------------+--------------+----------------+
|      11039|     Mary Torres|       Caguas|            PR|             725|
|       5623|      Jose Haley|     Columbus|            OH|           43207|
|       5829|      Mary Smith|      Houston|            TX|           77015|
|       6336|  Richard Maddox|       Caguas|            PR|             725|
|       1708|  Margaret Booth|    Arlington|            TX|           76010|
|      10227|  Mary Henderson|       Caguas|            PR|             725|
|        839|     Lisa Walker|       Caguas|            PR|             725|
|       7604|   Jonathan Hill|      Phoenix|            AZ|           85040|
|       6485|Carolyn Sheppard|Pompano Beach|            FL|           33063|
|       4737|    Mary Mendoza|       Caguas|            PR|             725|

In [23]:
spark.sql("select customer_state, count(*) from customer group by customer_state having count(*) > 50").show()

+--------------+--------+
|customer_state|count(1)|
+--------------+--------+
|            CA|     187|
|            NY|      79|
|            TX|      62|
|            PR|     505|
+--------------+--------+



In [24]:
cStateCount50 = spark.sql("select customer_state, count(*) as state_count from customer group by customer_state having state_count > 50")

In [25]:
cStateCount50.dtypes

[('customer_state', 'string'), ('state_count', 'bigint')]

In [26]:
cStateCount50.printSchema()

root
 |-- customer_state: string (nullable = true)
 |-- state_count: long (nullable = false)



In [27]:
type(cStateCount50)

pyspark.sql.dataframe.DataFrame

In [28]:
# simmillar to show()
cStateCount50.take(2)

[Row(customer_state='CA', state_count=187),
 Row(customer_state='NY', state_count=79)]

# Saving Dataframe Into Diff Format

In [29]:
!pwd

/home/hduser/Downloads


# RDD OPERATIONS

In [30]:
# Creation of Rdd using parallelize method
# We set the number of partiotion 2 
Rdd=spark.sparkContext.parallelize([1,2,3,4,5,6],2)
Rdd.collect()

[1, 2, 3, 4, 5, 6]

In [31]:
# To get number of partitions
Rdd.getNumPartitions()

2

In [32]:
rdd1=spark.sparkContext.textFile('/home/hduser/Downloads/sharedfolder/war_and_peace.txt')

In [33]:
# take will return 5 lines here
rdd1.take(5)

['The Project Gutenberg EBook of War and Peace, by Leo Tolstoy',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with almost',
 'no restrictions whatsoever.  You may copy it, give it away or re-use it',
 'under the terms of the Project Gutenberg License included with this']

In [34]:
rdd1.count()

63877

In [35]:
# return the first element
rdd1.first()

'The Project Gutenberg EBook of War and Peace, by Leo Tolstoy'

In [36]:
print(rdd1.filter(lambda x :x.startswith('R')).collect())

['Revolution was a grand thing!" continued Monsieur Pierre, betraying by', "Reaching the large house near the Horse Guards' barracks, in which", 'Rostov.', "Rostova's carriage in which they were seated drove over the straw", 'Russia and zat ze safety and dignity of ze Empire as vell as ze sanctity', "Running into Sonya's room and not finding her there, Natasha ran to the", 'Rays of gentle light shone from her large, timid eyes. Those eyes lit up', "Red patches appeared on Princess Mary's face and she was silent as if", 'Russia were settling near the fortress of Braunau and burdening the', 'Russian troops and their commander of the laurels they have been', 'Russian army will be fully equipped, and shall then, in conjunction with', "Russian army found vent in anger at Zherkov's untimely jest.", 'Rostov waved his cap above his head like the German and cried laughing,', 'Rostov took the money and, mechanically arranging the old and new coins', 'Rostov thrust the purse under the pillow and 

In [37]:
# det get_partitions(a):
#     yield sum(1 for_in a)

# rdd1.mapPartition(get_partitions).collect()


In [38]:
# due to take command 5 lines get converted into the list
rdd2=rdd1.take(5)
(rdd2)

['The Project Gutenberg EBook of War and Peace, by Leo Tolstoy',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with almost',
 'no restrictions whatsoever.  You may copy it, give it away or re-use it',
 'under the terms of the Project Gutenberg License included with this']

In [39]:
# take retunr output in the format of list
type(rdd1.take(5))

list

In [40]:
# to find the non_empty lines
non_null= rdd1.filter(lambda x: len(x)>0)
type(non_null)

pyspark.rdd.PipelinedRDD

In [41]:
non_null.count()

50902

In [42]:
non_null.take(5)

['The Project Gutenberg EBook of War and Peace, by Leo Tolstoy',
 'This eBook is for the use of anyone anywhere at no cost and with almost',
 'no restrictions whatsoever.  You may copy it, give it away or re-use it',
 'under the terms of the Project Gutenberg License included with this',
 'eBook or online at www.gutenberg.org']

In [43]:
# slit the each line into words. flatmap work on each word / each character
words=non_null.flatMap(lambda line: line.split())

In [44]:
words.collect()

['The',
 'Project',
 'Gutenberg',
 'EBook',
 'of',
 'War',
 'and',
 'Peace,',
 'by',
 'Leo',
 'Tolstoy',
 'This',
 'eBook',
 'is',
 'for',
 'the',
 'use',
 'of',
 'anyone',
 'anywhere',
 'at',
 'no',
 'cost',
 'and',
 'with',
 'almost',
 'no',
 'restrictions',
 'whatsoever.',
 'You',
 'may',
 'copy',
 'it,',
 'give',
 'it',
 'away',
 'or',
 're-use',
 'it',
 'under',
 'the',
 'terms',
 'of',
 'the',
 'Project',
 'Gutenberg',
 'License',
 'included',
 'with',
 'this',
 'eBook',
 'or',
 'online',
 'at',
 'www.gutenberg.org',
 'Title:',
 'War',
 'and',
 'Peace',
 'Author:',
 'Leo',
 'Tolstoy',
 'Translators:',
 'Louise',
 'and',
 'Aylmer',
 'Maude',
 'Posting',
 'Date:',
 'January',
 '10,',
 '2009',
 '[EBook',
 '#2600]',
 'Last',
 'Updated:',
 'March',
 '15,',
 '2013',
 'Language:',
 'English',
 '***',
 'START',
 'OF',
 'THIS',
 'PROJECT',
 'GUTENBERG',
 'EBOOK',
 'WAR',
 'AND',
 'PEACE',
 '***',
 'An',
 'Anonymous',
 'Volunteer,',
 'and',
 'David',
 'Widger',
 'BOOK',
 'ONE:',
 '1805',
 

In [45]:

count1=words.map(lambda x: (x,1))

In [46]:
count1.collect()

[('The', 1),
 ('Project', 1),
 ('Gutenberg', 1),
 ('EBook', 1),
 ('of', 1),
 ('War', 1),
 ('and', 1),
 ('Peace,', 1),
 ('by', 1),
 ('Leo', 1),
 ('Tolstoy', 1),
 ('This', 1),
 ('eBook', 1),
 ('is', 1),
 ('for', 1),
 ('the', 1),
 ('use', 1),
 ('of', 1),
 ('anyone', 1),
 ('anywhere', 1),
 ('at', 1),
 ('no', 1),
 ('cost', 1),
 ('and', 1),
 ('with', 1),
 ('almost', 1),
 ('no', 1),
 ('restrictions', 1),
 ('whatsoever.', 1),
 ('You', 1),
 ('may', 1),
 ('copy', 1),
 ('it,', 1),
 ('give', 1),
 ('it', 1),
 ('away', 1),
 ('or', 1),
 ('re-use', 1),
 ('it', 1),
 ('under', 1),
 ('the', 1),
 ('terms', 1),
 ('of', 1),
 ('the', 1),
 ('Project', 1),
 ('Gutenberg', 1),
 ('License', 1),
 ('included', 1),
 ('with', 1),
 ('this', 1),
 ('eBook', 1),
 ('or', 1),
 ('online', 1),
 ('at', 1),
 ('www.gutenberg.org', 1),
 ('Title:', 1),
 ('War', 1),
 ('and', 1),
 ('Peace', 1),
 ('Author:', 1),
 ('Leo', 1),
 ('Tolstoy', 1),
 ('Translators:', 1),
 ('Louise', 1),
 ('and', 1),
 ('Aylmer', 1),
 ('Maude', 1),
 ('Posting

In [47]:
# Reduce 
wordCounts = count1.reduceByKey(lambda prev, next: prev + next)

In [48]:
wordCounts.collect()

[('battle--', 1),
 ('century)', 1),
 ('reminiscence', 1),
 ('Empereur.', 1),
 ('snap.', 1),
 ('circle', 34),
 ('spectators.', 1),
 ('brigade--gave', 1),
 ('principals', 2),
 ('dream)', 3),
 ('rustle,', 1),
 ('Anonymous', 1),
 ('passing:', 2),
 ('now"', 1),
 ('evenly.', 1),
 ('something?"', 1),
 ('review', 12),
 ('Can', 22),
 ('habitual,', 1),
 ('twice,', 2),
 ('mean?"', 9),
 ('remembers...', 1),
 ('escape,', 3),
 ("Malasha's,", 1),
 ('Montmorencys', 1),
 ('Bronnitski,', 1),
 ('whole?', 1),
 ('tangled', 3),
 ('same.', 20),
 ('domo', 1),
 ('why', 217),
 ('limitations,', 2),
 ('Bolkonski,', 37),
 ('Just', 74),
 ('preceding', 10),
 ('usually', 34),
 ('cookers.', 1),
 ('Soldiers', 7),
 ('passe', 1),
 ('play.', 8),
 ('mended', 2),
 ('untimely', 2),
 ('frightfully', 1),
 ('Fedeshon!...', 1),
 ('ranks,', 14),
 ('grapeshot?"', 1),
 ('Austrians!', 1),
 ("We've", 3),
 ('conservatory.', 2),
 ('crop', 3),
 ('souverain', 1),
 ('cheese,"', 1),
 ('Everywhere,', 1),
 ('cavities', 1),
 ('Helena.', 1),
 

In [49]:
#The .sortByKey() Transformation
# -> It will sort by key . key : Abhay , value : 29

marks_rdd = spark.sparkContext.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
print(marks_rdd.sortByKey('ascending').collect())


[('Abhay', 29), ('Abhay', 26), ('Rahul', 25), ('Rahul', 23), ('Rohan', 22), ('Rohan', 22), ('Shreya', 22), ('Shreya', 28), ('Swati', 26), ('Swati', 19)]


In [50]:
# GroupBykey

marks_rdd = spark.sparkContext.parallelize([('Rahul', 25), ('Swati', 26), ('Shreya', 22), ('Abhay', 29), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.groupByKey().collect()
for key, value in dict_rdd:
    print(key, list(value))

Shreya [22, 28]
Swati [26, 19]
Rahul [25, 23]
Abhay [29, 26]
Rohan [22, 22]


In [51]:
# CountBykey

marks_rdd = spark.sparkContext.parallelize([('Rahul', 25), ('Swati', 26), ('Rohan', 22), ('Rahul', 23), ('Swati', 19), ('Shreya', 28), ('Abhay', 26), ('Rohan', 22)])
dict_rdd = marks_rdd.countByKey().items()
for key, value in dict_rdd:
    print(key, value)

Rahul 2
Rohan 2
Swati 2
Abhay 1
Shreya 1


# RDD TO DATAFRAEME

In [52]:
rdd3=spark.sparkContext.textFile("/home/hduser/Downloads/products.tsv")

In [53]:
rdd3.collect()

["98660\tO'Brien Men's Neoprene Life Vest\tFishing\t45\t49.98\t2\t12080",
 "97749\tO'Brien Men's Neoprene Life Vest\tFishing\t45\t49.98\t2\t12279",
 "103889\tUnder Armour Women's Ignite Slide\tBoxing & MMA\t12\t31.99\t4\t3343",
 "22751\tO'Brien Men's Neoprene Life Vest\tFishing\t45\t49.98\t5\t12382",
 '129001\tPelican Sunstream 100 Kayak\tBoating\t47\t199.99\t1\t5032',
 "165110\tNike Men's CJ Elite 2 TD Football Cleat\tCleats\t17\t129.99\t1\t2663",
 "140220\tDiamondback Women's Serene Classic Comfort Bi\tBike & Skate Shop\t42\t299.98\t1\t702",
 '77426\tField & Stream Sportsman 16 Gun Fire Safe\tHunting & Shooting\t44\t399.98\t1\t3959',
 '101870\tPerfect Fitness Perfect Rip Deck\tAs Seen on  TV!\t16\t59.99\t4\t1836',
 "131685\tNike Men's CJ Elite 2 TD Football Cleat\tCleats\t17\t129.99\t1\t5212",
 '164177\tPelican Sunstream 100 Kayak\tBoating\t47\t199.99\t1\t4037',
 "78983\tNike Men's CJ Elite 2 TD Football Cleat\tCleats\t17\t129.99\t1\t7528",
 "84894\tDiamondback Women's Serene Classic

In [54]:
# FLATMAP : IT WILL CREATE SINGLE ARRAY
dd=rdd3.flatMap(lambda x: x.split("\t"))

In [55]:
dd.collect()

['98660',
 "O'Brien Men's Neoprene Life Vest",
 'Fishing',
 '45',
 '49.98',
 '2',
 '12080',
 '97749',
 "O'Brien Men's Neoprene Life Vest",
 'Fishing',
 '45',
 '49.98',
 '2',
 '12279',
 '103889',
 "Under Armour Women's Ignite Slide",
 'Boxing & MMA',
 '12',
 '31.99',
 '4',
 '3343',
 '22751',
 "O'Brien Men's Neoprene Life Vest",
 'Fishing',
 '45',
 '49.98',
 '5',
 '12382',
 '129001',
 'Pelican Sunstream 100 Kayak',
 'Boating',
 '47',
 '199.99',
 '1',
 '5032',
 '165110',
 "Nike Men's CJ Elite 2 TD Football Cleat",
 'Cleats',
 '17',
 '129.99',
 '1',
 '2663',
 '140220',
 "Diamondback Women's Serene Classic Comfort Bi",
 'Bike & Skate Shop',
 '42',
 '299.98',
 '1',
 '702',
 '77426',
 'Field & Stream Sportsman 16 Gun Fire Safe',
 'Hunting & Shooting',
 '44',
 '399.98',
 '1',
 '3959',
 '101870',
 'Perfect Fitness Perfect Rip Deck',
 'As Seen on  TV!',
 '16',
 '59.99',
 '4',
 '1836',
 '131685',
 "Nike Men's CJ Elite 2 TD Football Cleat",
 'Cleats',
 '17',
 '129.99',
 '1',
 '5212',
 '164177',
 '

In [56]:
# MAP : CREATED THE ARRAY(ARRAY)
dd1=rdd3.map(lambda x : x.split("\t"))
dd1.collect()

[['98660',
  "O'Brien Men's Neoprene Life Vest",
  'Fishing',
  '45',
  '49.98',
  '2',
  '12080'],
 ['97749',
  "O'Brien Men's Neoprene Life Vest",
  'Fishing',
  '45',
  '49.98',
  '2',
  '12279'],
 ['103889',
  "Under Armour Women's Ignite Slide",
  'Boxing & MMA',
  '12',
  '31.99',
  '4',
  '3343'],
 ['22751',
  "O'Brien Men's Neoprene Life Vest",
  'Fishing',
  '45',
  '49.98',
  '5',
  '12382'],
 ['129001',
  'Pelican Sunstream 100 Kayak',
  'Boating',
  '47',
  '199.99',
  '1',
  '5032'],
 ['165110',
  "Nike Men's CJ Elite 2 TD Football Cleat",
  'Cleats',
  '17',
  '129.99',
  '1',
  '2663'],
 ['140220',
  "Diamondback Women's Serene Classic Comfort Bi",
  'Bike & Skate Shop',
  '42',
  '299.98',
  '1',
  '702'],
 ['77426',
  'Field & Stream Sportsman 16 Gun Fire Safe',
  'Hunting & Shooting',
  '44',
  '399.98',
  '1',
  '3959'],
 ['101870',
  'Perfect Fitness Perfect Rip Deck',
  'As Seen on  TV!',
  '16',
  '59.99',
  '4',
  '1836'],
 ['131685',
  "Nike Men's CJ Elite 2 TD 

In [57]:
# cREATED THE SCHEMA 'COLS' , TODF : TO CONVERT RDD -> DATAFRAME

cols=["customer_id","customer_name","customer_city","customer_state","customer_zipcode"]
df5=dd1.toDF(cols).show()

+-----------+--------------------+------------------+--------------+----------------+---+-----+
|customer_id|       customer_name|     customer_city|customer_state|customer_zipcode| _6|   _7|
+-----------+--------------------+------------------+--------------+----------------+---+-----+
|      98660|O'Brien Men's Neo...|           Fishing|            45|           49.98|  2|12080|
|      97749|O'Brien Men's Neo...|           Fishing|            45|           49.98|  2|12279|
|     103889|Under Armour Wome...|      Boxing & MMA|            12|           31.99|  4| 3343|
|      22751|O'Brien Men's Neo...|           Fishing|            45|           49.98|  5|12382|
|     129001|Pelican Sunstream...|           Boating|            47|          199.99|  1| 5032|
|     165110|Nike Men's CJ Eli...|            Cleats|            17|          129.99|  1| 2663|
|     140220|Diamondback Women...| Bike & Skate Shop|            42|          299.98|  1|  702|
|      77426|Field & Stream Sp...|Huntin

In [58]:
H=((dd1.toDF(cols)))

In [59]:
H.show()

+-----------+--------------------+------------------+--------------+----------------+---+-----+
|customer_id|       customer_name|     customer_city|customer_state|customer_zipcode| _6|   _7|
+-----------+--------------------+------------------+--------------+----------------+---+-----+
|      98660|O'Brien Men's Neo...|           Fishing|            45|           49.98|  2|12080|
|      97749|O'Brien Men's Neo...|           Fishing|            45|           49.98|  2|12279|
|     103889|Under Armour Wome...|      Boxing & MMA|            12|           31.99|  4| 3343|
|      22751|O'Brien Men's Neo...|           Fishing|            45|           49.98|  5|12382|
|     129001|Pelican Sunstream...|           Boating|            47|          199.99|  1| 5032|
|     165110|Nike Men's CJ Eli...|            Cleats|            17|          129.99|  1| 2663|
|     140220|Diamondback Women...| Bike & Skate Shop|            42|          299.98|  1|  702|
|      77426|Field & Stream Sp...|Huntin

In [60]:
H.select('customer_name').show()

+--------------------+
|       customer_name|
+--------------------+
|O'Brien Men's Neo...|
|O'Brien Men's Neo...|
|Under Armour Wome...|
|O'Brien Men's Neo...|
|Pelican Sunstream...|
|Nike Men's CJ Eli...|
|Diamondback Women...|
|Field & Stream Sp...|
|Perfect Fitness P...|
|Nike Men's CJ Eli...|
|Pelican Sunstream...|
|Nike Men's CJ Eli...|
|Diamondback Women...|
|Nike Men's CJ Eli...|
|Nike Men's Dri-FI...|
|O'Brien Men's Neo...|
|O'Brien Men's Neo...|
|Nike Men's Dri-FI...|
|Diamondback Women...|
|Under Armour Girl...|
+--------------------+
only showing top 20 rows



# USE HIVE IN PYSPARK

In [61]:
spark1 = SparkSession.builder.appName("Python Hive Connect Example") \
        .config("spark.sql.warehouse.dir","hdfs://localhost:54310/user/hive/warehouse") \
        .enableHiveSupport() \
        .getOrCreate()


# IN CONFIGURAION WE PASSED THE HDFS LOCALHOST
# USED .enableHiveSupport() TO START HIVE


In [62]:
spark1.sql('show databases').show()

+------------+
|databaseName|
+------------+
|     default|
+------------+



In [63]:
spark1.sql('creta database mb01')

ParseException: "\nmismatched input 'creta' expecting {'(', 'SELECT', 'FROM', 'ADD', 'DESC', 'WITH', 'VALUES', 'CREATE', 'TABLE', 'INSERT', 'DELETE', 'DESCRIBE', 'EXPLAIN', 'SHOW', 'USE', 'DROP', 'ALTER', 'MAP', 'SET', 'RESET', 'START', 'COMMIT', 'ROLLBACK', 'REDUCE', 'REFRESH', 'CLEAR', 'CACHE', 'UNCACHE', 'DFS', 'TRUNCATE', 'ANALYZE', 'LIST', 'REVOKE', 'GRANT', 'LOCK', 'UNLOCK', 'MSCK', 'EXPORT', 'IMPORT', 'LOAD'}(line 1, pos 0)\n\n== SQL ==\ncreta database mb01\n^^^\n"