# This ICE - 8 is being invoked by running the following commands in the terminal:

 
(base) robertfajardo@UroboricForms ~ % source ~/.bash_profile  \
(base) robertfajardo@UroboricForms ~ % pyspark  

To be more transparent, the bash_profile contains these two lines of code that activate Jupyter Lab - SparkSession

export PYSPARK_DRIVER_PYTHON=jupyter \
export PYSPARK_DRIVER_PYTHON_OPTS='lab'

![SNOWFALL](terminal.png)

![Check](bashfile.png)

In [1]:
#This cell confirms the version and pyspark activation
spark

In [2]:

#This activates(initializes) the spark packages and SparkSession

'''Refer to this page --->. https://spark.apache.org/docs/latest/sql-data-sources-text.html'''
#file = spark.sparkContext.textFile("/Users/robertfajardo/Downloads/ml-100k/u.user")

sc= spark.sparkContext

from pyspark.sql import SparkSession
  
spark = SparkSession.builder.appName("DataFrame").getOrCreate()


# Question 1

## Create spark RDD from external dataset(word_list.txt)). Execute transformation and actions by scala. 

Here, the data is captures from my folder and place into the spark context in RDD using the textFile method. "This method takes a URI for the file (either a local path on the machine, or a hdfs://, s3a://, etc URI) and reads it as a collection of lines." (ApaceSpark, 2022)
The RDD file name "file" can receive dataset operations such as counting, sizing, and map/reduce operations.


In [3]:

file = spark.sparkContext.textFile("/Users/robertfajardo/Library/CloudStorage/OneDrive-UNTSystem/UNT Courses/CSCE 5300 Big Data/Week8/word_list-1.txt")


## Change all words to uppercase and show the first two lines.

Changing all words to upper case allows use to count words regardless of case differentiation. 

In [4]:
myfile_up = file.map(lambda line: line.upper())

In [5]:
myfile_up.take(2)

                                                                                

['THE PROJECT GUTENBERG ETEXT OF MOBY WORD II BY GRADY WARD',
 'COPYRIGHT LAWS ARE CHANGING ALL OVER THE WORLD, BE SURE TO CHECK']

## Count the number of lines.

This transformation delimits words by line, thus allowing us to count each line as a row.  

In [6]:
file.count()

260

## Count the number of word “PROJECT”. 

"flatMap" transformation applies the function to all elements of the RDD and flattens the results. 




In [7]:
myfile_up.flatMap(lambda line: line.split(" "))\
    .filter(lambda word: word.count("PROJECT")).count()

32

## Count the words in the dataset

Now, I can count the words with the delimiter space split, mapping the words and assigning a count of 1 per occurance. Then, collecting the results together.

In [8]:
wordcount = myfile_up.flatMap(lambda line: line.split(" ")).map(lambda word: (word,1)).reduceByKey(lambda a,b:a+b).collect()
    
    
    #lambda word: (word, 1)).reduceByKey(lambda a,b:a +b)

                                                                                

In [9]:

print("Here are the first 20 wordcount results! \n\n"+ str(wordcount[1:20]))

Here are the first 20 wordcount results! 

[('ETEXT', 21), ('OF', 72), ('MOBY', 5), ('WORD', 6), ('II', 5), ('GRADY', 2), ('COPYRIGHT', 8), ('LAWS', 2), ('ARE', 15), ('OVER', 4), ('BE', 15), ('TO', 55), ('CHECK', 1), ('FOR', 21), ('YOUR', 12), ('COUNTRY', 1), ('FILES!!!', 1), ('PLEASE', 5), ('THIS', 42)]


# Question 2

Create spark RDD from external dataset(shakespeare.txt). Execute transformation and actions.

 


## Change all words to lowercase and show the first 5 lines. 

I'm using the textFile method and giving the output for reference.

In [36]:

shakespeare = spark.sparkContext.textFile("/Users/robertfajardo/Library/CloudStorage/OneDrive-UNTSystem/UNT Courses/CSCE 5300 Big Data/Week8/shakespeare-1.txt")

print("Here are the first 5 lines. Not including the space between lines \n\n")
shakespeare.take(6)

Here are the first 5 lines. Not including the space between lines 




['The Project Gutenberg EBook of The Complete Works of William Shakespeare, by ',
 'William Shakespeare',
 '',
 'This eBook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  You may copy it, give it away or',
 're-use it under the terms of the Project Gutenberg License included']

In [37]:
# Converting words to lowercase
myshake_down = shakespeare.map(lambda line: line.lower())

In [38]:
print("All words are now converted to lowercase. Here are the first 5 lines, excluding the space between the lines. \n\n")

myshake_down.take(6)

All words are now converted to lowercase. Here are the first 5 lines, excluding the space between the lines. 




['the project gutenberg ebook of the complete works of william shakespeare, by ',
 'william shakespeare',
 '',
 'this ebook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever.  you may copy it, give it away or',
 're-use it under the terms of the project gutenberg license included']

## Count the total number of words. 

It's imperative to clean the document of punctuation which could count as a word with the considerable spacing and punctuation in the dataset. 
In example, \
("words.", 1)<-with period and ("words", 1)<- without period.

In [39]:
#remove non-words, such as punctuation
def clean(x):
  punc='!"#$%&\'()*+,./:;<=>?@[\\]^_`{|}~-'
  words=x.lower()
  for ch in punc:
    words = words.replace(ch, '')
  return words

In [40]:
myshake_down = myshake_down.map(clean) #remove punctuation

In [41]:
print("Note that the last line after the word periods or commas.\n\n")
myshake_down.take(6)  #punctuation is removed. Viewing first 3 lines.



Note that the last line after the word periods or commas.




['the project gutenberg ebook of the complete works of william shakespeare by ',
 'william shakespeare',
 '',
 'this ebook is for the use of anyone anywhere at no cost and with',
 'almost no restrictions whatsoever  you may copy it give it away or',
 'reuse it under the terms of the project gutenberg license included']

In [42]:
myshake_down1= myshake_down.flatMap(lambda satir: satir.split(" ")) #split and delimit by space

In [43]:
myshake_down1.take(5)  #all words are split by the space and cleaned of puncutation

['the', 'project', 'gutenberg', 'ebook', 'of']

In [44]:
print("The shakespeare dataset has a total of "+ str(myshake_down1.count()) + " words.") 
#punctuation is not counted

print("and has a total of "+ str(myshake_down1.distinct().count()) + " distinct words.") 
#punctuation is not counted

The shakespeare dataset has a total of 1410759 words.


[Stage 34:>                                                         (0 + 2) / 2]

and has a total of 28494 distinct words.


                                                                                

<h4> Answer: The shakespeare dataset has a total of 1,410,759 words and has a total of 28,494 distinct words.

## Count the number of word “is”. 

In [45]:


myshake_down1.flatMap(lambda line: line.split(" "))\
    .filter(lambda word: word.count("is")).count()

                                                                                

36803

<h4>  Answer: The word "is" appears 36,803 times.

##  Count the number of unique words in the dataset. 

The words have already been pertitioned from lines and only need to assign the count per word.

In [46]:

shake_wordcount = myshake_down1.map(lambda word: (word, 1)).reduceByKey(lambda a,b:a +b)

In [47]:
shake_wordcount.take(2)

                                                                                

[('project', 329), ('gutenberg', 257)]

In [48]:
wordcount_columns = ["word","count"]
count = spark.createDataFrame(shake_wordcount, schema = wordcount_columns)
count.printSchema()
count.show(3)

RuntimeError: module compiled against API version 0xe but this version of numpy is 0xd

root
 |-- word: string (nullable = true)
 |-- count: long (nullable = true)

+---------+-----+
|     word|count|
+---------+-----+
|  project|  329|
|gutenberg|  257|
|    ebook|   16|
+---------+-----+
only showing top 3 rows



In [49]:
count.count()

                                                                                

28494

In [50]:
print("There are a total of "+ str(myshake_down1.distinct().count()) + " distinct words.") 

[Stage 48:>                                                         (0 + 2) / 2]

There are a total of 28494 distinct words.


                                                                                

<h4> Answer: There is a total of 28,494 unique(distinct) words in the dataset. </h4>

# Question 3

## Create Spark dataframe from hotel_booking data and execute some query. 

Previously, we read text files and now we have a .csv file with an option to inferschema and header. 
In this format, I can use SparkSQL commands to gather statistics and general SQL queries as well. 

In [51]:
df = spark.read.format("com.dtabricks.spark.csv").option("mode","DROPFORMALIZED").option("header", True)\
    .option("inferschema", True).\
    csv("/Users/robertfajardo/Library/CloudStorage/OneDrive-UNTSystem/UNT Courses/CSCE 5300 Big Data/Week8/hotel_bookings.csv")

22/04/11 22:53:16 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.
                                                                                

In [52]:
df.printSchema()

root
 |-- hotel: string (nullable = true)
 |-- is_canceled: integer (nullable = true)
 |-- lead_time: integer (nullable = true)
 |-- arrival_date_year: integer (nullable = true)
 |-- arrival_date_month: string (nullable = true)
 |-- arrival_date_week_number: integer (nullable = true)
 |-- arrival_date_day_of_month: integer (nullable = true)
 |-- stays_in_weekend_nights: integer (nullable = true)
 |-- stays_in_week_nights: integer (nullable = true)
 |-- adults: integer (nullable = true)
 |-- children: string (nullable = true)
 |-- babies: integer (nullable = true)
 |-- meal: string (nullable = true)
 |-- country: string (nullable = true)
 |-- market_segment: string (nullable = true)
 |-- distribution_channel: string (nullable = true)
 |-- is_repeated_guest: integer (nullable = true)
 |-- previous_cancellations: integer (nullable = true)
 |-- previous_bookings_not_canceled: integer (nullable = true)
 |-- reserved_room_type: string (nullable = true)
 |-- assigned_room_type: string (nullab

## Get statistics on adults.

 
Show some statistical values(mean, max value) of adults column. 

In [48]:
df.describe("adults").show()

22/04/11 21:58:43 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.
22/04/11 21:58:43 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.

+-------+------------------+
|summary|            adults|
+-------+------------------+
|  count|            119390|
|   mean|1.8564033838679956|
| stddev|0.5792609988327547|
|    min|                 0|
|    max|                55|
+-------+------------------+



                                                                                

## Count total number of canceled by hotel. 

In [49]:
df.groupby("hotel").sum("is_canceled").show()

22/04/11 21:58:45 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.
22/04/11 21:58:45 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.

+------------+----------------+
|       hotel|sum(is_canceled)|
+------------+----------------+
|Resort Hotel|           11122|
|  City Hotel|           33102|
+------------+----------------+



                                                                                

## Register the DataFrame as a global temporary view.

While in global temp view, we can now gather similar statistics using general knowledge of SQL queries invoked inside spark.sql. 

In [51]:
df.createOrReplaceGlobalTempView("hotel")

#As of version 2.2.0, in order to re-run the notebook, running only the df.createGlobalTempView("hoteldf") multiple times
    #generates an error. Therefore we need to utilize createOrReplaceGlobalTempView

## Use query to count number of records is reservation_status=”canceled”.

In [52]:
cancelled = spark.sql('''
Select count(reservation_status) as cancel from global_temp.hotel where reservation_status='Canceled' 

''')

In [53]:
cancelled.show()

22/04/11 21:59:18 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.
22/04/11 21:59:18 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.

+------+
|cancel|
+------+
| 43017|
+------+



                                                                                

## Use query to count the number of agents.  Group by hotel. 

In [54]:
agents = spark.sql('''
Select sum(agent) as agents_count, hotel from global_temp.hotel group by hotel

''')

In [55]:
agents.show()

22/04/11 21:59:25 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.
22/04/11 21:59:25 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.


+------------+------------+
|agents_count|       hotel|
+------------+------------+
|   6929877.0|Resort Hotel|
|   2003876.0|  City Hotel|
+------------+------------+



## Use query to count the number of babies when babies are greater than 0 by year. 

In [56]:
baby_number = spark.sql('''

Select arrival_date_year, count(babies) as baby_count 

from global_temp.hotel where babies>0 group by arrival_date_year ''')

In [57]:
baby_number.show()

22/04/11 21:59:37 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.
22/04/11 21:59:37 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.

+-----------------+----------+
|arrival_date_year|baby_count|
+-----------------+----------+
|             2015|       213|
|             2016|       446|
|             2017|       258|
+-----------------+----------+



                                                                                

## Use query to sort the number of canceled by country in decreasing order. 

In [60]:
cancel_number = spark.sql('''

select country, sum(is_canceled) as all_canceled 
    from global_temp.hotel
        group by country 
            order by all_canceled desc  
            ''')

In [61]:
cancel_number.show()

22/04/11 21:59:51 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.
22/04/11 21:59:51 WARN ParseMode: DROPFORMALIZED is not a valid parse mode. Using PERMISSIVE.

+-------+------------+
|country|all_canceled|
+-------+------------+
|    PRT|       27519|
|    GBR|        2453|
|    ESP|        2177|
|    FRA|        1934|
|    ITA|        1333|
|    DEU|        1218|
|    IRL|         832|
|    BRA|         830|
|    USA|         501|
|    BEL|         474|
|    CHN|         462|
|    CHE|         428|
|    NLD|         387|
|     CN|         254|
|    RUS|         239|
|    AUT|         230|
|    SWE|         227|
|    POL|         215|
|    AGO|         205|
|    NOR|         181|
+-------+------------+
only showing top 20 rows



                                                                                

## References: 
https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations \
https://spark.apache.org/docs/2.2.0/sql-programming-guide.html \
https://sparkbyexamples.com/pyspark-rdd \
https://sparkbyexamples.com/pyspark/different-ways-to-create-dataframe-in-pyspark/ 