# Get Dataset

In [1]:
from google.colab import drive
drive.mount('/gdrive')

Mounted at /gdrive


In [7]:
!unzip /gdrive/MyDrive/dataset.zip

Archive:  /gdrive/MyDrive/dataset.zip
  inflating: weather-sa-2017-2019-clean.csv  


## Read Dataset

In [111]:
import pandas as pd
df = pd.read_csv('/content/weather-sa-2017-2019-clean.csv')

In [112]:
df

Unnamed: 0.1,Unnamed: 0,city,date,time,year,month,day,hour,minute,weather,temp,wind,humidity,barometer,visibility
0,0,Qassim,1 January 2017,00:00,2017,1,1,24,0,Clear,17,11,64%,1018.0,16
1,1,Qassim,1 January 2017,01:00,2017,1,1,1,0,Clear,17,6,64%,1018.0,16
2,2,Qassim,1 January 2017,03:00,2017,1,1,3,0,Clear,15,11,72%,1019.0,16
3,3,Qassim,1 January 2017,04:00,2017,1,1,4,0,Clear,15,11,72%,1019.0,16
4,4,Qassim,1 January 2017,05:00,2017,1,1,5,0,Clear,15,9,72%,1019.0,16
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
249018,2848,Jawf,30 April 2019,19:00,2019,4,30,19,0,Passing clouds,32,19,14%,1014.0,-1
249019,2849,Jawf,30 April 2019,20:00,2019,4,30,20,0,Passing clouds,29,9,22%,1015.0,-1
249020,2850,Jawf,30 April 2019,21:00,2019,4,30,21,0,Passing clouds,27,7,24%,1016.0,-1
249021,2851,Jawf,30 April 2019,22:00,2019,4,30,22,0,Clear,26,0,26%,1017.0,16


In [113]:
df.drop([df.columns[0]],axis=1,inplace=True)

## MapReduce

### Mapper

In [114]:
def mapper(dataset):
  ls_year = []
  ls_month = []
  ls_temp = []
  for line in dataset:
    line = line.strip()
      # split the line into words
    if line == 'month' :
      for row in df[line]:
        ls_month.append(row)
    if line == 'year' :
      for row in df[line]:
        ls_year.append(row)
    if line == 'temp' :
      for row in df[line]:
        ls_temp.append(row)
  return ls_year,ls_month,ls_temp


data=mapper(df)


### Reducer

In [115]:
def reducer(data):
    d = {'Year':data[0],'Month':data[1],'Temp':data[2]}
    data = pd.DataFrame(d)
    data_group = data.groupby(['Year','Month']).sum('Temp')
    data_group['Temp_avg'] =data.groupby(['Year','Month']).mean('Temp')
    return data_group

        

                
data = reducer(data)

In [116]:
data

Unnamed: 0_level_0,Unnamed: 1_level_0,Temp,Temp_avg
Year,Month,Unnamed: 2_level_1,Unnamed: 3_level_1
2017,1,137778,16.011389
2017,2,121745,15.459683
2017,3,182319,20.786569
2017,4,226180,25.991726
2017,5,279789,29.673242
2017,6,295879,32.672151
2017,7,329677,34.542854
2017,8,322923,33.974014
2017,9,286648,31.29345
2017,10,252830,26.602483


## Spark

In [1]:
!pip install pyspark



In [2]:
from pyspark.sql import SparkSession

spark = (SparkSession
 .builder
 .appName('Hive_database')
 .enableHiveSupport()
 .getOrCreate())

In [8]:
df =spark.read.csv('/content/weather-sa-2017-2019-clean.csv',inferSchema=True,header=True)

In [9]:
df.show()

+---+------+--------------+-----+----+-----+---+----+------+-------+----+----+--------+---------+----------+
|_c0|  city|          date| time|year|month|day|hour|minute|weather|temp|wind|humidity|barometer|visibility|
+---+------+--------------+-----+----+-----+---+----+------+-------+----+----+--------+---------+----------+
|  0|Qassim|1 January 2017|00:00|2017|    1|  1|  24|     0| Clear |  17|  11|     64%|     1018|        16|
|  1|Qassim|1 January 2017|01:00|2017|    1|  1|   1|     0| Clear |  17|   6|     64%|     1018|        16|
|  2|Qassim|1 January 2017|03:00|2017|    1|  1|   3|     0| Clear |  15|  11|     72%|     1019|        16|
|  3|Qassim|1 January 2017|04:00|2017|    1|  1|   4|     0| Clear |  15|  11|     72%|     1019|        16|
|  4|Qassim|1 January 2017|05:00|2017|    1|  1|   5|     0| Clear |  15|   9|     72%|     1019|        16|
|  5|Qassim|1 January 2017|06:00|2017|    1|  1|   6|     0| Clear |  13|  13|     82%|     1019|        16|
|  6|Qassim|1 Janua

In [10]:
df.printSchema()

root
 |-- _c0: integer (nullable = true)
 |-- city: string (nullable = true)
 |-- date: string (nullable = true)
 |-- time: string (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- day: integer (nullable = true)
 |-- hour: integer (nullable = true)
 |-- minute: integer (nullable = true)
 |-- weather: string (nullable = true)
 |-- temp: integer (nullable = true)
 |-- wind: integer (nullable = true)
 |-- humidity: string (nullable = true)
 |-- barometer: string (nullable = true)
 |-- visibility: integer (nullable = true)



### Spark Grouping by month and year

In [11]:
from pyspark.sql.functions import avg
data_group=df.orderBy("year").groupBy("year","month") \
    .agg(avg("temp").alias("AVG"), \
     ) 

In [12]:
data_group.sort(df.year,df.month).show()

+----+-----+------------------+
|year|month|               AVG|
+----+-----+------------------+
|2017|    1| 16.01138872748402|
|2017|    2| 15.45968253968254|
|2017|    3|20.786569376353892|
|2017|    4|25.991726039990805|
|2017|    5|29.673242125357937|
|2017|    6| 32.67215106007067|
|2017|    7| 34.54285414920369|
|2017|    8|33.974013677012096|
|2017|    9| 31.29344978165939|
|2017|   10|26.602483164983166|
|2017|   11|21.211630957586358|
|2017|   12| 16.18076923076923|
|2018|    1|14.898592977563696|
|2018|    2|18.877053669222345|
|2018|    3| 23.53194396993509|
|2018|    4|  24.5737881508079|
|2018|    5|29.794104152328515|
|2018|    6| 32.85965942454492|
|2018|    7| 33.69704209328783|
|2018|    8| 32.91702758149077|
+----+-----+------------------+
only showing top 20 rows



In [13]:
data_group.printSchema()

root
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- AVG: double (nullable = true)



In [14]:
df=df.drop("_c0") 

In [None]:
df.write.csv("weather-dataset.csv")

## Hive 

### Create database

In [19]:
spark.sql('create database hive_data')

DataFrame[]

In [20]:
spark.sql('show databases').show()

+-------------+
|    namespace|
+-------------+
|      default|
|    hive_data|
|hive_database|
+-------------+



In [21]:
spark.sql('use hive_data')

DataFrame[]

### Create Table

In [22]:
spark.sql("create table hive_tables \
           ( city string,date string,time string,year integer,month integer,day integer,hour integer,minute integer,\
           weather string,temp integer,wind integer,humidity string,barometer string,visibility integer)\
           row format delimited fields terminated by ','  " ) 

DataFrame[]

In [23]:
spark.sql("show tables").show()

+---------+-----------+-----------+
|namespace|  tableName|isTemporary|
+---------+-----------+-----------+
|hive_data|hive_tables|      false|
+---------+-----------+-----------+



In [24]:
spark.sql("describe formatted hive_tables").show(truncate = False)

+----------------------------+----------------------------+-------+
|col_name                    |data_type                   |comment|
+----------------------------+----------------------------+-------+
|city                        |string                      |null   |
|date                        |string                      |null   |
|time                        |string                      |null   |
|year                        |int                         |null   |
|month                       |int                         |null   |
|day                         |int                         |null   |
|hour                        |int                         |null   |
|minute                      |int                         |null   |
|weather                     |string                      |null   |
|temp                        |int                         |null   |
|wind                        |int                         |null   |
|humidity                    |string            

In [27]:
spark.sql("LOAD DATA  INPATH '/content/weather-dataset.csv' overwrite into TABLE hive_tables")

DataFrame[]

In [28]:
spark.sql("select * from hive_tables limit 10").show(truncate = False)

+----------------+--------------+-----+----+-----+---+----+------+-------+----+----+--------+---------+----------+
|city            |date          |time |year|month|day|hour|minute|weather|temp|wind|humidity|barometer|visibility|
+----------------+--------------+-----+----+-----+---+----+------+-------+----+----+--------+---------+----------+
|Northern boarder|12 August 2017|17:00|2017|8    |12 |17  |0     |Sunny  |46  |20  |5%      |1002     |16        |
|Northern boarder|12 August 2017|18:00|2017|8    |12 |18  |0     |Sunny  |45  |13  |6%      |1002     |16        |
|Northern boarder|12 August 2017|19:00|2017|8    |12 |19  |0     |Clear  |44  |15  |6%      |1002     |16        |
|Northern boarder|12 August 2017|20:00|2017|8    |12 |20  |0     |Clear  |42  |4   |7%      |1002     |16        |
|Northern boarder|12 August 2017|21:00|2017|8    |12 |21  |0     |Clear  |39  |0   |8%      |1002     |16        |
|Northern boarder|12 August 2017|22:00|2017|8    |12 |22  |0     |Clear  |38  |0

### Grouping by month and year

In [30]:
spark.sql('SELECT year, month,AVG(temp) from hive_tables GROUP BY year , month ORDER BY year,month ;').show(truncate = False)

+----+-----+------------------+
|year|month|avg(temp)         |
+----+-----+------------------+
|2017|1    |16.01138872748402 |
|2017|2    |15.45968253968254 |
|2017|3    |20.786569376353892|
|2017|4    |25.991726039990805|
|2017|5    |29.673242125357937|
|2017|6    |32.67215106007067 |
|2017|7    |34.54285414920369 |
|2017|8    |33.974013677012096|
|2017|9    |31.29344978165939 |
|2017|10   |26.602483164983166|
|2017|11   |21.211630957586358|
|2017|12   |16.18076923076923 |
|2018|1    |14.898592977563696|
|2018|2    |18.877053669222345|
|2018|3    |23.53194396993509 |
|2018|4    |24.5737881508079  |
|2018|5    |29.794104152328515|
|2018|6    |32.85965942454492 |
|2018|7    |33.69704209328783 |
|2018|8    |32.91702758149077 |
+----+-----+------------------+
only showing top 20 rows

