<a href="https://colab.research.google.com/github/MarioNavarrete/bigdata-colab/blob/main/bigdata_colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# BigData Tools in Colab!

The idea of this tutorial is provide you the classical BigData tools without using any of your own local resources.

First, we will clone the repo that contains the needed scripts.

In [1]:
#!rm -r /content/bigdata-colab/
!git clone https://github.com/MarioNavarrete/bigdata-colab.git

rm: cannot remove '/content/bigdata-colab/': No such file or directory
Cloning into 'bigdata-colab'...
remote: Enumerating objects: 72, done.[K
remote: Counting objects: 100% (72/72), done.[K
remote: Compressing objects: 100% (47/47), done.[K
remote: Total 72 (delta 34), reused 54 (delta 20), pack-reused 0[K
Unpacking objects: 100% (72/72), done.


Preparing the dataset for this tutorial. All the scripts use a CSV file without index and header, so we will convert the parquet file into the needed formated CSV.

In [2]:
import pandas as pd
parquet_file = '/content/bigdata-colab/dataset/data.parquet'
data = pd.read_parquet(parquet_file)
data = data.dropna()
data.reset_index(inplace = True, drop = True)
data.to_csv('data.csv',index = False, header = False)
data.head()

Unnamed: 0,datetime,a,b,c,d
0,2016-09-30 00:00:01,9.7,60.5,561.68,907.8
1,2016-09-30 00:15:01,2.2,99.22,1818.84,1346.4
2,2016-09-30 00:30:01,5.5,43.56,567.53,571.2
3,2016-09-30 00:45:01,3.8,89.54,1821.72,933.8
4,2016-09-30 01:00:01,6.4,107.69,1319.97,911.2


Instal the environment. This _executable.py_ will do all the work of installing and preparing the bigdata enviroment we need for using HDFS, MapReduce, Hive, Sqoop and Spark.

In [3]:
exec(open('/content/bigdata-colab/executable.py').read())

Active services:
1730 NameNode
2210 Jps
2167 JobHistoryServer
1640 ResourceManager
1998 DataNode
2111 NodeManager



## HDFS and MapReduce

Checking that we are able to use hadoop. We should expect that we have an empty file system.

In [4]:
!hadoop fs -ls

Move the local file into our local HDFS

In [5]:
!hadoop fs -put /content/data.csv data.csv
!hadoop fs -ls

Found 1 items
-rw-r--r--   1 root supergroup   56003136 2020-11-04 16:11 data.csv


A single file as we expected.

We already have created a very basic MapReduce job, and a bash script that run the needed commands to make it work. This should produce the min,max and the average of the second column, grouped by date.

If you want, you can go directly and change the script to point to your own data.

In [6]:
!bash /content/bigdata-colab/mapreduce/mapreduce.sh

rm: `/user/root/Output_test': No such file or directory
20/11/04 16:11:48 WARN streaming.StreamJob: -file option is deprecated, please use generic option -files instead.
packageJobJar: [/content/bigdata-colab/mapreduce/mapper.py, /content/bigdata-colab/mapreduce/reducer.py, /tmp/hadoop-unjar8916503154460475634/] [] /tmp/streamjob930844558876494403.jar tmpDir=null
20/11/04 16:11:49 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
20/11/04 16:11:49 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
20/11/04 16:11:50 INFO mapred.FileInputFormat: Total input paths to process : 1
20/11/04 16:11:50 INFO mapreduce.JobSubmitter: number of splits:2
20/11/04 16:11:50 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1604506089806_0001
20/11/04 16:11:51 INFO impl.YarnClientImpl: Submitted application application_1604506089806_0001
20/11/04 16:11:51 INFO mapreduce.Job: The url to track the job: http://4bddbf5513f5:8088/proxy/application_1604506089806_0

Check the outputs in HDFS and copy to local

In [7]:
!hadoop fs -ls Out*

Found 2 items
-rw-r--r--   1 root supergroup          0 2020-11-04 16:12 Output_test/_SUCCESS
-rw-r--r--   1 root supergroup      20390 2020-11-04 16:12 Output_test/part-00000


In [8]:
!mkdir /content/mapreduce

!hadoop fs -get /user/root/Output_test /content/mapreduce/output

20/11/04 16:12:25 WARN hdfs.DFSClient: DFSInputStream has been closed already
20/11/04 16:12:25 WARN hdfs.DFSClient: DFSInputStream has been closed already


In [9]:
columns = ['date','max_value','min_value','avg_value']

output_mapred = pd.read_csv('/content/mapreduce/output/part-00000', sep = '\t', names=columns)
output_mapred.date = pd.to_datetime(output_mapred.date)
output_mapred.head()

Unnamed: 0,date,max_value,min_value,avg_value
0,2016-09-30,9317.58,0.1,397.034175
1,2016-10-01,10925.04,0.1,381.14
2,2016-10-02,12467.07,0.1,552.618267
3,2016-10-03,11381.92,0.1,402.38595
4,2016-10-04,13262.81,0.1,603.710668


## Hive

For this small tutorial, we are going to create a new Hive table using 3 different methods:


1.   Using an interactive Hive command line and pasting a HiveQL statement.
2.   Running the same HiveQL statement from a .sql file.
3.   Importing a MySQL table into Hive using Sqoop.





#### Loading file from local path and running interactively.

In [11]:
allow_groupby = 'set hive.groupby.orderby.position.alias=true;'

create_statement = """
create table test (datetime timestamp, a double, b double, c double, d double) row format delimited fields terminated by ',';
"""

load_data = "LOAD DATA LOCAL INPATH '/content/data.csv' OVERWRITE INTO TABLE test;"

sql = """
select to_date(datetime),
avg(a) avg_a,
avg(b) avg_b,
avg(c) avg_c,
avg(d) avg_d,
min(a) min_a,
min(b) min_b,
min(c) min_c,
min(d) min_d,
max(a) max_a,
max(b) max_b,
max(c) max_c,
max(d) max_d
from test
group by 1
limit 10;
"""

exit = "exit;"

full_statement = ''.join([allow_groupby,create_statement,load_data,sql,exit])
full_statement

"set hive.groupby.orderby.position.alias=true;\ncreate table test (datetime timestamp, a double, b double, c double, d double) row format delimited fields terminated by ',';\nLOAD DATA LOCAL INPATH '/content/data.csv' OVERWRITE INTO TABLE test;\nselect to_date(datetime),\navg(a) avg_a,\navg(b) avg_b,\navg(c) avg_c,\navg(d) avg_d,\nmin(a) min_a,\nmin(b) min_b,\nmin(c) min_c,\nmin(d) min_d,\nmax(a) max_a,\nmax(b) max_b,\nmax(c) max_c,\nmax(d) max_d\nfrom test\ngroup by 1\nlimit 10;\nexit;"

In [12]:
##you need to copy the printed statement and pasting into the command line that 
##will appear in this block once you run it
!hive

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/content/apache-hive-2.3.7-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/content/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

Logging initialized using configuration in jar:file:/content/apache-hive-2.3.7-bin/lib/hive-common-2.3.7.jar!/hive-log4j2.properties Async: true
Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
hive> set hive.groupby.orderby.position.alias=true; create table test (datetime timestamp, a double, b double, c double, d double) row format delimited fields terminated by ','; LOAD 

#### Running the same steps using a .sql file

In [None]:
!hive -f /content/bigdata-colab/hive/script.sql

#### MySQL + Sqoop to Hive

First we need to load our data into our MySQL data base, and then grab all that data and move it into Hive. We will create the same table we created in the past blocks, but we are going to name it _test3_.

In [14]:
!mysql -u root --password=password testdb < /content/bigdata-colab/mysql/load_data.sql



In [15]:
!mysql -u root --password=password testdb -e "select * from test3 limit 5"

+---------------------+------+--------+---------+--------+
| datetime            | a    | b      | c       | d      |
+---------------------+------+--------+---------+--------+
| 2016-09-30 00:00:01 |  9.7 |   60.5 |  561.68 |  907.8 |
| 2016-09-30 00:15:01 |  2.2 |  99.22 | 1818.84 | 1346.4 |
| 2016-09-30 00:30:01 |  5.5 |  43.56 |  567.53 |  571.2 |
| 2016-09-30 00:45:01 |  3.8 |  89.54 | 1821.72 |  933.8 |
| 2016-09-30 01:00:01 |  6.4 | 107.69 | 1319.97 |  911.2 |
+---------------------+------+--------+---------+--------+


In [None]:
!sqoop import --connect jdbc:mysql://localhost/testdb --username root --password password --table test3 --hive-import -m 1

Same as the first Hive block, we will need to copy the sql statement and printed into the command line.

In [17]:
sql3 = """set hive.groupby.orderby.position.alias=true;
select to_date(datetime),
avg(a) avg_a,
avg(b) avg_b,
avg(c) avg_c,
avg(d) avg_d,
min(a) min_a,
min(b) min_b,
min(c) min_c,
min(d) min_d,
max(a) max_a,
max(b) max_b,
max(c) max_c,
max(d) max_d
from test3
group by 1
limit 10;
exit;
"""
!hive

SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/content/apache-hive-2.3.7-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/content/hadoop-2.7.0/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.apache.logging.slf4j.Log4jLoggerFactory]

Logging initialized using configuration in jar:file:/content/apache-hive-2.3.7-bin/lib/hive-common-2.3.7.jar!/hive-log4j2.properties Async: true
Hive-on-MR is deprecated in Hive 2 and may not be available in the future versions. Consider using a different execution engine (i.e. spark, tez) or using Hive 1.X releases.
hive> set hive.groupby.orderby.position.alias=true; select to_date(datetime), avg(a) avg_a, avg(b) avg_b, avg(c) avg_c, avg(d) avg_d, min(a) min_a, min(b) min_b, min(c) min_c, min(d) 

## Spark

For this short tutorial, we are going to load the dataset directly from Hive, and also, directly from the parquet and csv files that will be stored in HDFS.

In [18]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

spark = SparkSession.builder.enableHiveSupport().appName("bigdata-tools").getOrCreate()

#### Reading from Hive

We are going to re-use the sql statement we created in the first Hive block. We dont need to run the create statements due we enable HiveSupport in our Spark session (using _test3_ table this time).

In [20]:
sql = """
select to_date(datetime),
avg(a) avg_a,
avg(b) avg_b,
avg(c) avg_c,
avg(d) avg_d,
min(a) min_a,
min(b) min_b,
min(c) min_c,
min(d) min_d,
max(a) max_a,
max(b) max_b,
max(c) max_c,
max(d) max_d
from test3
group by 1
order by 1 asc;
"""
spark.sql(sql[:-2]).show(10)

+---------------------------------+------------------+------------------+------------------+------------------+-----+-----+-----+-----+-------+------+-------+-------+
|to_date(default.test3.`datetime`)|             avg_a|             avg_b|             avg_c|             avg_d|min_a|min_b|min_c|min_d|  max_a| max_b|  max_c|  max_d|
+---------------------------------+------------------+------------------+------------------+------------------+-----+-----+-----+-----+-------+------+-------+-------+
|                       2016-09-30|396.23908333333327| 70.59916666666668|  803.904291666667|      1617.6736875|  0.1|  1.1| 6.25|13.07|9317.58|222.75|3001.68|4406.49|
|                       2016-10-01|380.35087500000003|  67.8066041666667| 831.0852708333329| 1830.497854166665|  0.1| 0.54| 8.81|18.28|10925.0|222.75|3196.76|4666.74|
|                       2016-10-02| 551.4695416666667| 67.49656250000005|  956.866541666667|1795.0325208333338|  0.1| 1.05| 9.06|21.14|12467.1|222.75|3786.72|4272.96

#### Reading parquet file

We need to put the file into HDFS, and then we can read it.

In [21]:
!hadoop fs -put /content/bigdata-colab/dataset/data.parquet data.parquet

spark_df = spark.read.parquet('/user/root/data.parquet')
spark_df.show(5)

+-------------------+------------------+-----------------+------------------+------------------+
|           datetime|                 a|                b|                 c|                 d|
+-------------------+------------------+-----------------+------------------+------------------+
|2016-09-30 00:00:01| 9.700000000000001|             60.5|            561.68|             907.8|
|2016-09-30 00:15:01|               2.2|            99.22|           1818.84|1346.3999999999999|
|2016-09-30 00:30:01|               5.5|            43.56|            567.53| 571.1999999999999|
|2016-09-30 00:45:01|3.8000000000000003|89.53999999999999|1821.7199999999998|             933.8|
|2016-09-30 01:00:01|               6.4|           107.69|           1319.97| 911.1999999999999|
+-------------------+------------------+-----------------+------------------+------------------+
only showing top 5 rows



We will create the last table _test4_ with the same dataset, and then we will use spark sql to run the spark sql statement.

In [22]:
spark_df.createOrReplaceTempView("test4")
sql = """
select to_date(datetime),
avg(a) avg_a,
avg(b) avg_b,
avg(c) avg_c,
avg(d) avg_d,
min(a) min_a,
min(b) min_b,
min(c) min_c,
min(d) min_d,
max(a) max_a,
max(b) max_b,
max(c) max_c,
max(d) max_d
from test4
group by 1
order by 1 asc;
"""

In [23]:
spark.sql(sql[:-2]).show(5)

+-------------------------+------------------+-----------------+------------------+------------------+-----+-----+-----+-----+------------------+------+------------------+-------+
|to_date(test4.`datetime`)|             avg_a|            avg_b|             avg_c|             avg_d|min_a|min_b|min_c|min_d|             max_a| max_b|             max_c|  max_d|
+-------------------------+------------------+-----------------+------------------+------------------+-----+-----+-----+-----+------------------+------+------------------+-------+
|               2016-09-30| 396.2390833333333|70.59916666666668|  803.904291666667|      1617.6736875|  0.1|  1.1| 6.25|13.07|           9317.58|222.75|           3001.68|4406.49|
|               2016-10-01|380.35095833333344| 67.8066041666667| 831.0852708333329| 1830.497854166665|  0.1| 0.54| 8.81|18.28|          10925.04|222.75|3196.7599999999998|4666.74|
|               2016-10-02| 551.4694791666666|67.49656250000005|  956.866541666667|1795.032520833333

#### Reading csv file

We are going to use the CSV we already stored on HDFS.

In [27]:
spark_df_csv = spark.read.csv('/user/root/data.csv', 
                          schema = 'datetime timestamp, a float, b float, c float, d float',
                          sep = ',')
spark_df_csv.show(5)

+-------------------+---+------+-------+------+
|           datetime|  a|     b|      c|     d|
+-------------------+---+------+-------+------+
|2016-09-30 00:00:01|9.7|  60.5| 561.68| 907.8|
|2016-09-30 00:15:01|2.2| 99.22|1818.84|1346.4|
|2016-09-30 00:30:01|5.5| 43.56| 567.53| 571.2|
|2016-09-30 00:45:01|3.8| 89.54|1821.72| 933.8|
|2016-09-30 01:00:01|6.4|107.69|1319.97| 911.2|
+-------------------+---+------+-------+------+
only showing top 5 rows



In [28]:
spark_df_csv.createOrReplaceTempView("test5")
sql = """
select to_date(datetime),
avg(a) avg_a,
avg(b) avg_b,
avg(c) avg_c,
avg(d) avg_d,
min(a) min_a,
min(b) min_b,
min(c) min_c,
min(d) min_d,
max(a) max_a,
max(b) max_b,
max(c) max_c,
max(d) max_d
from test5
group by 1
order by 1 asc;
"""
spark.sql(sql[:-2]).show(5)

+-------------------------+------------------+-----------------+------------------+------------------+-----+-----+-----+-----+--------+------+-------+-------+
|to_date(test5.`datetime`)|             avg_a|            avg_b|             avg_c|             avg_d|min_a|min_b|min_c|min_d|   max_a| max_b|  max_c|  max_d|
+-------------------------+------------------+-----------------+------------------+------------------+-----+-----+-----+-----+--------+------+-------+-------+
|               2016-09-30| 396.2390839710211|70.59916661133369| 803.9042923967044|1617.6736891508103|  0.1|  1.1| 6.25|13.07| 9317.58|222.75|3001.68|4406.49|
|               2016-10-01|380.35096037131734|67.80660413354636| 831.0852713167667| 1830.497857928276|  0.1| 0.54| 8.81|18.28|10925.04|222.75|3196.76|4666.74|
|               2016-10-02| 551.4694790929556|67.49656240890424| 956.8665415167809|1795.0325181643168|  0.1| 1.05| 9.06|21.14|12467.07|222.75|3786.72|4272.96|
|               2016-10-03| 401.5628545624204|

There are some small differences in the outputs due the different precision type we are using for our data.
