# Using Hadoop's Core: HDFS and MapReduce



In [None]:
from mrjob.job import MRJob
from mrjob.step import MRStep

class RatingsBreakdown(MRJob):
  def steps(self):
    return[
           MRStep(mapper=self.mapper_get_ratings, # this extracts the movieID and 1
                  reducer=self.reducer_count_ratings), # this count them up
           MRStep(mapper=self.reducer_sorted_output) # interates through each movie count 
    ]
  def mapper_get_ratings(self, _, line):
    (userID, movieID, ratings, timestamp) = line.split('\t')
    yield movieID, 1
  
  def reducer_count_ratings(self, _, line):
    yield str(sum(values)).zfill(5), key
  
  def reducer_sorted_output(self, count, movies):
    for movie in movies:
      yield movie, count

if __name__ == '__main__':
  ratingsBreakdown.run()

pass


# Programming Hadoop with Pig

## PIG Latin: Thing i can do with Pig


1.   LOAD, STORE, DUMP
  *   STORE ratings INTP 'outRatings' USING PigStorage(':');
2.   FILTER, DISTINCT, FOREACH/GENERATE, MAPREDUCE, STREAM, SAMPLE
3.   JOIN, COGROUP, GROUP, CROSS, CUBE
4.   ORDER, RANK, LIMIT
5.   UNION, SPLIT

## Diagnostics

*   DESCRIBE
*   EXPLAIN
*   ILLUSTRATE

## User Defined Functions(UDFs)

*   REGISTER
*   DEFINE
*   IMPORT

## Some other functions and loaders

*   AVG, CONCAT, COUNT, MAX, MIN, SIZE, SUM
*   PigStorage
*   TextLoader
*   JsonLoader
*   AvroStorage
*   ParquetLoader
*   OrcStorage
*   HBaseStorage










In [None]:
# Creaating a relation name "ratings" with a given schema

rating= LOAD 'user/maria_dev/ml_100k/u.data' AS # loads up the movie rating data, AS - gives the data a schema
(userID:int, movieID: int, ratingID:int,RatingTime:int); 

metadata = LOAD 'user/maria_dev/ml_100k/u.item' USING # this fie contains the actual name.  Hence, metadata will assiciate MovieID with movieTitle
PigStorage('|') AS (movieID:int, movieTitle:chararray, releaseDate:chararray, releaseDate:chararray, videoRelease:chararray, imdbLink:chararray);

nameLookup = FOREACH metadata GENERATE movieID, movieTitle,ToUnixTime(ToDate(releaseDate, "dd-MMM-yyyy")) AS releaseTime;  # FOREACH GENRATE is going to let us generate a new relation that would contain the following stuff

# Group by
ratingsByMovie = GROUP ratings By movieID; # this creates a bag in pig of all the raings associated with a movieID

avgRatings = FOREACH ratingsByMovie GENERATE group AS movieID, AVG(ratings.rating) AS avgRating; # this goes across all the ratings and averages it up

# FILTER
fiveStarMovies = FILTER avgRatings BY avgRating > 4.0;

# JOIN - we need to join in the movie names 
fiveStarWithData = JOIN fiveStarMovies By movieID, nameLookup BY movieID;

# ORDER BY
oldestFiveStarMovies = ORDER fiveStarWithData BY nameLookup::releaseTime;

DUMP oldestFiveStarMovies;

pass
  


# Another Pig problem to solve


*   Find all movies with an average rating less than 2.0
*   Sort them by total number of ratings



In [None]:
rating= LOAD 'user/maria_dev/ml_100k/u.data' AS # loads up the movie rating data, AS - gives the data a schema
(userID:int, movieID: int, ratingID:int,RatingTime:int); 

metadata = LOAD 'user/maria_dev/ml_100k/u.item' USING # this fie contains the actual name.  Hence, metadata will assiciate MovieID with movieTitle
PigStorage('|') AS (movieID:int, movieTitle:chararray, releaseDate:chararray, releaseDate:chararray, videoRelease:chararray, imdbLink:chararray);

nameLookup = FOREACH metadata GENERATE movieID, movieTitle  # FOREACH GENRATE is going to let us generate a new relation that would contain the following stuff

# Group by
groupedRatings = GROUP ratings By movieID; # this creates a bag in pig of all the raings associated with a movieID

avgRatings = FOREACH groupedRatings GENERATE group AS movieID, AVG(ratings.rating) AS avgRating,
COUNT(ratings.rating) AS numRatings; # this goes across all the ratings and averages it up

# FILTER
badMovies = FILTER avgRatings BY avgRating > 2.0;

# JOIN - we need to join in the movie names 
namedbadMovies = JOIN badMovies By movieID, nameLookup BY movieID;


finalResults = FOREACH namedbadMovies GENERATE nameLookup::movieTitle AS movieName,
badMovies::avgRating AS avgRating, badMovies::numRatings AS numRatings;

# ORDER
finalResultsSorted = ORDER finalResults BY numRatings DESC;

DUMP finalResultsSorted;

# Hive

## Why Hive?

*   Uses familiar SQL syntax(HiveQL)
*   Interactive
*   Scalable - works with Big Data on a cluster and it is really most appropriate for warehouse applications.
*   Easy OLAP queries - Way easier than writing MapReduce in Java
*   Highly Optimized
*   Highly extensible:
  *   User define function
  *   Thrift server
  *   JDBC/ ODBC driver

## Why Not Hive?

*   High Latency - Not Appropriate fir OLTP
*   Stores data de-normalized
*   SQL is limited in what it can do;
  *   Pig, Spark allows for more complex stuff
*   No Transactions
*   No record-level updates, inserts and deletes

## HiveQL

*   It is pretty much MySQL with some added extentions
*   for example views:
  *   Can store results of a query inot a "view", whihc subsequent queries can use as a table.
*   Allows you to specify how structured data is stored in a partition.






### Working with Hive to find the most popular movie in our dataSet.

In [None]:
CREATE VIEW topMovieIDs AS  # create a table name top movieID with 2 columns
SELECT movieID, count(movieId) as ratingCount
FROM ratings
GROUP BY movieID
ORDER BY ratingCount DESC; # shows data in decending order

SELECT n.title, ratingCount
FROM topMovieIDs t JOIN names n ON t.movieID=n.movieID

pass

## Hive - 
*   Schema on Read: 
  *   Hive maintain a "metastore" that imparts a struture you define on unstructured data that is stored on the HDFS, etc.
*   Where is the Data:
  *   LOAD DATA - MOVES data from a distributed filesystem into Hive
  *   LOAD DATA LOCAL - COPIES data from you local filesystem into Hive
  *   Managed vs External tables
    CREATE EXTERNAL TABLE IF NOT EXISTS rating(
      userID INT
      movieID INT
      rating INT
      time INT
    ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t'
    LOCATION'data/ml-100k/u.data'

in this case the external table if Hive is suing it and we drop it.  the original copy at the stored locaton will still be in tact.  Managed LOAD DATA if a table is dropped it si gone forever.  Hence, Hive does not take ownership of external table.

*   Partinioning: 
  *   you can store data in partioned subdirectories.
  * Huge optimization if your queries is only for certain partions

  CREATE TABLE customers(
    name STRING,
    address STRUCT <street: STRING, city:STRING, zip:INT>
  )
  PARTITION BY (country STRING);

      .../customers/country=CA/
      .../customer/country=GB/

# Ways to use Hive
*   Interactive via Hive>prompt/Command Line Interface(CLI)
*   Saved query files:
  * hive -f /somepath/queries.hql
*   Through Ambari/Hue
*   Through JDBC/ODBC server
*   Through Thrift Service 
  *   Note that Hive is not suitable for OLTP
* Via Oozie


In [None]:
# Find the movie withthe Highest average rating
# Only consider movie with more than 10 ratings

CREATE VIEW IF NOT EXISTS avgRatings AS 
SELECT movieID, AVG(rating) as avgRating, COUNT(movieID) as ratingCount
FROM rating
GROUP BY movieID
ORDER BY avgRating DESC;

SELECT n.title, avgRating
FROM avgRatings t JOIN names n ON t.movieID=n.movieID
WHERE ratingCount >10;

pass




# Integrating MySQL with Hadoop

*   scoop can handle Big data.
*   It actually kicks of MapReduce to handle importing or exporting your data
*   Hence, the mappers(distributing data) taking data from MySQL, is going to talk to my HDFS and create a set of table where we can then use Hive or pig on it
*   Scoop: Import Data from MySQL to HDFS
  *   scoop:
import --connectjdbc:mysql://localhost/movielens --driver.com.mysql.jdbc.Drivers --table movies
*   Scoop: Import data directly into Hive
  *   scoop import --connectjdbc:mysql://localhost/movielens --driver.com.mysql.jdbc.Drivers --table movies --hive-import
*   Another thing about Scoop is that you can do incremental imports:
  *   you can keep your relational database and Hadoop in sync
  *   --check-column and --last-value
*   Scoop: Export data from Hive to MySQL:
  *   scoop export(table must be created ahead of time) --connect jdbc:mysql://localhost/movielens -m 1 --driver.com.mysql.jdbc.Drivers --tables export_movies --export-dir apps/hive.warehouse/movies--input-fields-terminated-by'\0001'
*   Target table must already exist in MySQL, with columns in expected order.


MySQL does come alread install in HortonworksSandbox

*   to login to MySQL:
*   mysql -u root -p
*   password: hadoop
*   Creating a movielens database then i would import some data into it:
1.   create database movielens;
2.   show databases; # you will now see the dbs
3.   exit
4.   downloading the data: wget http://media.sundog.soft.com/hadoop/movielens.sql.
5.   I need to log back inot MySQL
6.   There are some internation characters in this dataset i need to deal with.
7.   SET NAMES 'utf8';
8.   SET CHARACTER SET utf8;
9.   I need to tell it to use the dbs i created earlier:
  *   use movielens;
10.   source movielens.sql
11.   show tables; # will get back the dbs
12.   SELECT * from movies limit 10; # to get the first 10 rows
13.   describe ratings; 

using MySQL we can figure out he top rated mvies and thier titles

SELECT movies.title, COUNT(ratings.movie_id) AS ratingCount
FROM movies
INNER JOIN ratings
ON movies.id = ratings.movies_id
GROUP BY movies.title
ORDER BY ratingCount;

14.   exit - to get back 






In [None]:
GRANT ALL PRIVILIDGES ON movielens.*to''@'localhost';
exit;

pass

# Import Data from MySQL into HDFS

In [None]:
sqoop import --connectjdbc:mysql://localhost/movielens --driver com.mysql.jdbc.Driver --table movies -m 1 # this means we connecting to a mysql database
# and it dumps all of our data inot the HDFS

pass


# Putting Data into Hive imported from MySQL

In [None]:
sqoop import --connectjdbc:mysql://localhost/movielens --driver com.mysql.jdbc.Driver --table movies -m --hive-import # this will import the data into hive
pass

# Using Scoop to export data from Hadoop to MySQL
Note. yiou need to make sure the table exist ahead of time in order to recieve the data

*   Login: mysql - u root -p;
*   use movielens;
*   CREATE TABLE exproted_movies (id INTEGER, title VARCHAR(255), releaseData Date); # this gives us the table we need to recieve the data
*   exit
# export data from Hive back into MySQL
scoop export --connectjdbc:mysql://localhost/movielens -m 1 --driver com.mysql.jdbc.Driver --table exported_movies --export-dir /apps/hive.warehouse/movies--input-fields-terminated-by'\0001'



# NoSQL


## HBase

What not to do when dealing with planet size data:
Scaling up MySQL etc. to massive load requires extreme measures

*   Denormalizing
*   Caching layers
*   Mater/slave setups
*   Sharding
*   Materialzed views
*   Remvoing Stored procedures

### What is Apache HBase?
*   It is a non-relational, scalable database built on HDFS
*   It is an open source implementation of Google Big Table with a few minr changes.
*   It does not have a query language
*   It use an API to carry out CRUD operation
  *   Create
  *   Read
  *   Update 
  *   Delete
### HBase data model
*   Fast access to any given row
*   A ROW is referenced by a unique KeY
*   Each ROW has a small number of COLUMN FAMILIES
*   A COLUMN FAMILY may contain arbitrary COLUMNS
*   You can have a large number of COLUMNS in a COLUMN FAMILY
*   Each CELL have many VERSIONS with a given timestamps
*   Sparse data is A-OK missing columns ina row consume no storage.

### Some way to access HBase
*   HBase shell
*   Java API
  *   Wrappers for Python, Scala, etc
*   Spark, Hive, Pig
*   REST service
*   Thrift Service
*   Avro Service

### Gonna create an HBase tabel with Python via REST
Plan:
*   Create an HBase table for movie rating by user
*   Then quickly query it for individual users
*   This is a good example of Sparse data.




In [None]:
from starbase import Connection

c = Connection("127.0.0.1", "8000") # Create the connection on port 8000 which is the port i open to the virtualBox and the REST server will be operating on this port

ratings = c.table("ratings") # creating a table called ratings

if (ratings.exists()):
  print("Dropping existing table\n")
  ratings.drop()

ratings.create("rating") # this creates a family table called rating.  

print("Parsing the ml-100k ratings data ...\n")
ratingFile = open("c:/downloads/ml-100k/ml-100k/u.data", "r") # open file in read only fashion

batch = ratings.bach() # instead of adding in a row at a time I can do it in batches.  I am creating a batch object from my raings table

for line in ratingFile: # i will keep updates the bach with new rows
  (userID, movieID, rating, timestamp) = line.split()
  batch.update(userID, {"rating": {movieID:rating}})

ratingFile.close()

print("Commiting ratings data to HBase via RESTservices\n")
batch.commit(finalize=True)

print("Get back raitngs for some users ...\n")
print("Ratings for user ID 1: \n")
print(ratings.fetch("1"))
print("Ratings for user ID 33:\n")
print(raings.fetch(33))

ratings.drop()

pass

# Integrating Pig with HBase (again is good for Big data)

Plan:

*   Must create table ahead of 
*   Relations must have a unique key for the first column, followed by subsequent columns as you want them saved in HBase
*   USING clause allows you to store into an HBase table
*   Can work at Scale - HBase is transactional on rows

In Hbase, I am goignto create a table:

create 'users', 'userinfo' # hit enter and the table will be created

exit



In [None]:
ratings = LOAD'/user/maria_dev/ml-100k/u.user'
USING PigStorage("|")
AS (userID:int, age:int, gender:chararray, occupation:chararray, zip:int);

STORE rating INTO 'hbase://users'
USING org.apache.pig.backend.hadoop.hbase.HBaseStorage(
    'userinfo:age, userinfo:gender, userinfo:occupation, userinfo:zip'
)
hbase.pig(end)

pig hbase.pig # runs the code
disable 'users' # to close it off
drop 'users' to drop it from the dbs
exit # to exit out of the shell
and finally close of HBase 
pass

# Cassandra 
This is a distributed database with no single point of failure.  

## Cassandra -NoSQL ith a twist

*   Unlike HBase, there is no master node at all, every node runs exactly the same software and performs the same functions
*   Data Model is similar to BigTable/HBase
*   It is non-relational, but has a liited CQL query language as its' interface.



## Cassandra's Design Choices
*   The CAP theorem says you can only have 2 out of 3:consistency, availability, partition-tolerance.
  *   Partition-Tolerance is a requirement of "Big Data".  SO you really only get a choice between consistency or availability.
*   Cassandra favors availability over consistency.
  *   It is "eventually consistent."
  *   But you can specify your consistency requirements as part of your request.  Hence, it is "tunable consistency."

## Cassandra and Cluster
*   Cassandra's great for fast access to rowsof information
*   Get the best of both worlds by replicating to another ring that is used for analytics and spark integration.  

## CQL
*   Cassandra's API is CQL, which makes it easy to look like existing database drivers to application.
*   CQL is like SQL, but with some big limitations!
  *   NO JOINS
    *   your data must be de-normalized
    *   so it is still non-relational
  *   All queries must be on the same primary key:
    *   Second indices are supported, but...
*   CQLSH can be used on command line to create tables,
*   All tables must be in a *keyspace* - keyspaces are like databases.

# Cassandra and Spark
*   DataStax offers a spark-cassadra connector.
*   Allows you to read and write Cassandra Tables as DatFrames
*   Is smart about passing queiries on those DataFrames down to the appropriate level.
*   Uses cases:
  *   Use Spark for analytics and data stored in Cassandra
  *   Use Spark to transfomr data and store it inot Cassandra for transactional use.

### Plan:
*   Install Cassandra on Hadoop Node
*   Setup a table for movielens users
*   Write into that table and query it from Spark

In [None]:
cqlsh> CREATE KEYSPACE movielens WITH application = {'class': 'SimpleStrategy','replication_factor': '1'} AND durable_writes=true;
cqlsh> Use movielens;
cqlsh:movielens> CREATE TABLE users(user_id int, age int, gender text, occupation text, zip text, PRIMARY KEY(user_id));
cqlsh:movielens> DESCRIBE TABLE users # it will show you the results 
pass

## Write Spark output into Cassandra


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql import Row


def parseInput(line):
  fields = line.split("|")
  return Row(user_id=int(fields[0]), age=int(fields[1]), gender=fields[2], occupation=fields[3], zip=fields[4])

if __name__="__main__":
  # Create a spark Session
  spark = SparkSession.builder.appName("CassandraIntegration").master("spark.cassandra.connection.host", "127.0.0.1").getOrCreate()

  # Get the raw data
  lines = spark.sparkContext..textFile("hdfs:///user/maria_dev/ml-100k/u.user")
  # Convert it to a RDD of row objects with(user_id, age, gender, occupation, zip)
  users = lines.map(parseInput)
  # Convert that into a DataFrame
  usersDataset = spark.createDataFrame(users)

  # Write this into Cassandra
  usersDataset.write\
  .format("org.apache.spark.sql.cassandra")\
  .mode("apend")\
  .options(table="users", keyspace="movielens")\
  .save()

  # Read it back from Cassandra into a new DataFrame
  readUsers = spark.read\
  .format("org.apache.spark.sql.cassandra")\
  .options(table="users", keyspace="movielens")\
  .load()

  readUsers.createOrReplaceTempView("users")

  sqlDf = spark.sql("SELECT * FROM users WHERE age < 20")
  sqlDF.show()

  # Stop the session
  spark.stop()
  pass


In [None]:
# to import the file 
wget(web address of the file)
# telling spark what version I am using
export SPARK_MAJOR_VERSION=2
spark-submit --packages datastax:spark-cassandra-connector:2.0.0-m2-s_2.11 CassandraSpark.py

pass


finally after runing 

exit and stop Cassadra(service cassandra stop)

# MongoDB

### Integrating Spark with MongoDB




In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions
from pyspark.sql import Row


def parseInput(line):
  fields = line.split("|")
  return Row(user_id=int(fields[0]), age=int(fields[1]), gender=fields[2], occupation=fields[3], zip=fields[4])

if __name__="__main__":
  # Create a spark Session
  spark = SparkSession.builder.appName("MongoDBIntegration").getOrCreate()

  # Get the raw data
  lines = spark.sparkContext..textFile("hdfs:///user/maria_dev/ml-100k/u.user")
  # Convert it to a RDD of row objects with(user_id, age, gender, occupation, zip)
  users = lines.map(parseInput)
  # Convert that into a DataFrame
  usersDataset = spark.createDataFrame(users)

  # Write this into MongoDB
  usersDataset.write\
  .format("com.mongodb.spark.sql.DefaultSource")\
  .mode("apend")\
  .options("uri", "mongodb://127.0.0.1/movielens.users")\
  .save()

  # Read it back from Cassandra into a new DataFrame
  readUsers = spark.read\
  .format("com.mongodb.spark.sql.DefaultSource")\
  .options("uri", "mongodb://127.0.0.1/movielens.users")\
  .load()

  readUsers.createOrReplaceTempView("users")

  sqlDf = spark.sql("SELECT * FROM users WHERE age < 20")
  sqlDF.show()

  # Stop the session
  spark.stop()
  pass

In [None]:
# Using sandbox I am going to run this code
less MongoSpark.py
export SPARK_MAJOR_VERSION=2
spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.0.0 MongoSpark.py

pass

# Setting up APache Drill
*   wget http://archive.apache.org/dist/drill.drill-1.12.0/apache-drill-1.12.0.tar.gz
*   tar -xvf apache-drill-1.12.0.tar.gz #this will decompress the entire package
*   cd apache-drill-1.12.0 # after the package is decompressed we can cd into the directories.
*   bin/drillbit.sh start -Ddrill.exec.port=8765 # this will start up the servers for Apache drill. Also we need to overide the default port and open a port that is open to the outside world so that we can communicate with drill(HDP is running on a docker container so opening a port in virtula box won't work). Hence, this will start up drillbit on the given port.  just need to pull up my browser and start working on it going to use 127.0.0.1:8765
*   Once on drillbit click on storage and enable hive and mongo if it is disable.
*   Finally hit the update button next to Hive and we are connected.

### Query across multiple database with drill.
The nice thing about drill it is standard SQL database.  Hence, we query it using SQl as we would do for any database.

In [None]:
SHOW DATABSES; # WILL SHOW ALL THE DBS.
SELECT * FROM hive.movielens.ratings LIMIT 10; # so this will give me back the first 10 coloumns from hive
SELECT * FROM mongo.movielens.users LIMIT 10;  # so this will give me back the first 10 coloumns from mongo
SELECT u.occupation, COUNT(*) FROM hive.movielens.ratings r JOIN mongo.movielens.users u ON r.user_id = u.user_id GROUP BY u.occupation # this queries to different dbs, jojns and group result
bin/drillbit.sh stop # this shuts down drillbit
pass

## Apache Phoenix

*   A SQL driver for HBase that supports transactions
*   Fast, low-latency - OLTP support
*   Originally developed by salesforce, then open-source
*   Exposes a JDBC connector for HBase
*   SUpports secondary indices and user-defined functions
*   Integrates with MapReduce, Spark, Hive, Pig, and Flume

### Using Phoenix
*   Command-Line Interface(CLI)
*   Phoenix API for Java
*   JDBC driver (thick client)
*   Phoenix Query Server(PQS)(thin client)
  *   Intended to eventually handle non-JVM access
*   JARs for MapReduce, Spark, Hive, Pig and Flume





## Installing phoenix
*   Ensure HBase is running before installing Phoenix
*   yum install phoenix # this command install phoenix
*   python sqlline.py # this kicks of phoenix
*   !tables #  gives all the tables Phoenix knows about.  Also, phoenix runs on top of HBase and all queries again uses SQL.  Some commands like INSERT is not availabel but we Use UPSERT instead.  


## Gonna execute some sql commands
*   CREATE TABLE IF NOT EXIST us_population(state CHAR(2) NOT NULL, city VARCHAR NOT NULL, population BIGINT, CONSTRAINT my_pk PRIMARY KEY(state, city));


In [None]:
 UPSERT INTO US_POPULATION VALUES('NY', 'NEY YORK', '8143197'); # THIS WOULD INSERT THESE VALUES IN THE TABLE
 SELECT * FROM US_POPULATION; # RETURNS THE TABLE 
 SELECT * FROM US_POPULATION WHERE STATE='NY';
 DROP TABLE US_POPULATION;
 !quit


## Integrate Phoenix with Pig to go into HBase
*   python.sqlline.py # this takes us to our phoenix prompt
*   CREATE TABLE users(USERID INTEGER NOT NULL, AGE INTEGER, GENDER CHAR(1), OCCUPATION VARCHAR, ZIP VARCHAR CONSTRAINT pk PRIMARY KEY(USERID)); # this would give us the user table in HBase
*   !tables # to make sure the user table
*   !quit
Go back to sandbox and create the data
*   mkdir ml-100k
*   cd ml-100k/
*   wget (web address) to retrieve the data 
*   cd..# back to home dir

*   wget http://media.sundog-soft.com/hadoop/phoenix.py
  *   REGISTER /usr/hdp/current/phoenix-client/phoenix-client.jar
  *   users = LOAD'user/maria_dev/ml-100k/u.user'
  *   USING PigStorage('|')
  *   AS (USERID:int, AGE:int, GENDER:chararray, OCCUPATION:chararray, ZIP:chararray);

  *   STORE users into 'hbase://users' using org.apache.phoenix.pig.PhoenixHBaseStorage('localhost', '-batchsize 5000');

  *  occupations = load 'hbase://table/user/USERID,OCCUPATION' using org.apache.phoenix.pig.PhoenixHBAseLoader('localhost');

  *   grpd = GROUP occupations BY OCCUPATION
  *   cnt = FOREACH grpd GENERATE group AS OCCUPATION, COUNT(occupations);
  *   DUMP cnt
  *   phoenix.pig (END)

*   pig.phoenix.pig # this will run the code


# Presto
*  It is a lot like Drill
  *   It can connect to many different "Big Data" databases and data stores at once and query across them
  *   Familiar SQL syntax
  *   Optimizing for OLAP - analytical queries, data warehousing
  *   Developed and partially maintained by facebook
  *   Exposes JDBC, Command-line, and Tableau intefaces
  *   vs Drill, It has a cassandra connector
  *   Its good enough for Facebook, DropBox and AirBNB
  *   A single Presto query can combine data from multiple sources, allowing analitic across entire organisation.
  *   Presto breaks the False choice between having fast analytics using an expensive commercial solution or using a slow free solution requireing execessive hardware.  

## Install Presto and query Hive with it.
*   wget https://repo1.maven.org/maven2/com/facebook/presto/presto-server/0.243.1/presto-server-0.243.1.tar.gz
*   tar -xvf presto-server-0.243.1.tar.gz
*   cd presto-server-0.243.1.tar.gz
*   wget http://media.sundog-soft.com/hadoop/presto-hdp-config.tgz # Config files.
*   tar-xvf presto-hdp-config.tgz # this runs presto and creates the etc files
*   cd etc

We need to get a command-line interface.  Goinhg to documentation:(make sure you are in the bin/presto server.

*   wget https://repo1.maven.org/maven2/com/facebook/presto/presto-cli/0.243.1/presto-cli-0.243.1-executable.jar # run this to get the CLI

*   mv presto-cli-0.243.1-executable.jar presto # rename the file
*   chmod +x presto # it is now an executable command and it would kick of the CLI withthe appropriate parameters
*   cd.. # takes me back to the top of the presto server directory
*   pwd # shows you where you are and what you have installed
*   bin/launcher start # launches the server
*   127.0.0.8090 # this would bring up our dash board showing our cluster overview

*   bin//presto --server 127.0.0.1:8090 --catalog hive # running this aloows me to connect to the server.

*   show tables from default;  # my raitngs table shows up in this case
*   select * from default.raings limit 10; # give me back the first 10 rows
*   select * from default.ratings where rating = 5 limit 10; returns all 5 star rating.   
*   select count (*) from default.ratings where rating=1; # this counts up all 1 star rating  (6110 one star rating)
*   quit # to end ;
* bin/launcher stop # stops presto;

## Query both Hive and Cassandra using presto
*   scl enable python27 bash;
*   service cassandra start; # starts cassandra
*   notool enablethrift; # presto needs this to be able to communicate with cassandra.
*   cqlsh --cqlversion="3.4.0";  # Loads up cassandra CLI
*   describe keyspaces; # movielens shows up
*   use movielens 
*   describe tables;
*   select * stars from users limit 10;
Now i am going to connect this to presto together with the raitngs data that is living in Hive combing them to do a query.  In order to connect with presto we need to set up a configuration file for cassadra with presto before i can query the data :)  fro the presto server directory lets:
*   cd etc/catalog 
*   vi cassandra.properties (hit "I" key to insert)
*   connector.name=cassadra
*   cassandra.connect-points=127.0.0.1
*   Esc
*   :wq # writes and quit this file
*   cd ../.. # moveup by 2 dir
*   bin/launcher start # starts to presto
*   bin/presto --server 127.0.0.1:8090 --catalog hive, cassandra
running query now
*   show tables from cassadra.movielens; # displays my "users" table
*   describe cassandra.movielens.users;
*   select * from cassandra.movielens.users limit 10;
*   select * from hive.default.ratings limit 10;
*   select u.occupation, count(*) from hive.default.rating r JOIN cassandra.movielens.users u on r.user_id = u.user_id group by u.occupation; # this groups all of the users for each unique occupation and count up the relevant ratings
*   quit
*   bin/launcher stop # stops presto
*   service cassandra stop










# Feeding data into my cluster

## Kafka
### Streaming
*   Lets you publish this data in real time to your cluster
  *   You can even process it in real time as your data comes in.

*   Two problems:
  *   How to get data from many sources flowing into your cluster.
  *   Processing it when it gets there.

##**Kafka**
*   Kafka is a general purpose publish/subscribe messaging system
*   Kafka servers stores all of the incoming mesages from publishers for some time, and pubishes them to stream of data called a topic.
*   Kafka cosumers subcribe to one or more topics, and recieved data as it's published
*   A stream/topic can have many consumers, all with their own position in the stream maintain.  
*  Its not jsut for Hadoop

### How Kafka Scales
*   Kafka itself may be distributed amon many processes among many servers.
  *   WIll distribute the storage of stream data.
*   Consumers may also be distributed:
  *   Consumers of the same group will have messages distributed amoung them.
  *   Consumers of different groups will get their own copy of each message

### Kafka Project Plan
*   Set up a topic
  *   Publich some data to it, and watch it get consumed
*   Set up a file connector
  *   Monitor a log file and publish additions to it.

*   ./kafka-topic.sh --create --zookeeper sandbox.hortonworks.com:2181 --replication-factor 1 --partition 1 --topic fred # this creates our stream.
*   ./kafka-topic.sh --list --zookeeper sandbox.hortonworks.com:2181 --replication-factor 1 --partition 1 --topic fred # this creates our stream. # making sure it was created by checking it is there.
*   ./kafka-console-producer.sh --broker-list sandbox.hortonworks.com:6667 --topic fred  # this kafka producer will listen for keystroke data and braodcast any message on the topic "fred"

Two msg is goign to be published:

  this is a line of data

  i am sending this on the fred topic

Opening a second window on sandbox
*   cd /usr/hdp/current/kafka-broker/bin
*   ./kafka-consol-consumer.sh --boostrap-server sandbox.hortonworks.com:6667 --zookeeper locahost:2181 --topic-fred --from-beginning # as the kafka consumer when this code is kick of i will recieve the two line of data sent above. 

### Publishing web logs with kafka
 Going to use a built in Kafka connector to monitor a file and publish new lines to that file which then gets written out to another file somewhere lse(log processor).

 Kafka has a file connector that is built in.  I just need to configure it.

*   cd.. # out of the bin folder
*   cd conf # there are some sample configuration
*   Since these are Kafka file. I will make copies so that I can do what ever i want to them.  I am going to sopy them into my home dir
*   cp connect-standalone.properties ~/
*   cp connect-file-sink.properties ~/
*   cp connect-file-source.properties ~/
*   cd ~ # to edit them

when editing the file:
to Insert text hit the I key.
to leave, hit esc.  then :wq


*   vi connect-standalone.properties # the bootstrap server needs to reflect the corrrect port and host(sandbox.howtonworks.com:6667).  then to write and quite ":wq"
*   vi connect-file-sink.properties # file=/home/maria_dev/logout.txt "(hit I to insert)"  Changes the name of the file.  we also need to specifiy the topic Iwe are goignto listent too.  topics=log-test
*   vi connect-file-source.properties # file=/home/maria_dev/access_log_small_txt.  topic=log-test

Yeehaa. everythig is configured!!

wget http://media.sundog-soft.com//hadoop/access_log_small.txt
less access_log_small.txt # take a look at the results

Going to set up a cosumer to print out what is going on that "test-log" topic

./kafka-console-consumer.sh --bootstrap-server.sandbox.hortonworks.com:6667 --topic log-test --zookeeper localhost:2181

*   cd /usr/hdp/current/kafka-broker/bin
*   ./connect.standalone.sh ~/ connect-standalone.properties connect ~/connect.file.source.properties ~/connect.file.sink.properties









# Apache Flume (streaming)
## There are 3 components of a Fume agaent
*   Source
  *   Where is the data coming from
  *   Can optionally have Channel selectors and Interceptors
*   Channel
  *   How the data is transfered(via memory or files)
*   Sink
  *   Where the data is going 
  *   Can be organized into sink Group
  *   A sink can only be connected to one channel
    *   Channel is notified to delete a message to delete a message once sink processes it.  


## A single-node Flume configuration(setting up a flow)

### Name the componenets on this agent
*   a1.sources = r1
*   a1.sinks = k1
*   a1.channels = c1

### Describe and configure the source
*   a1.sources.r1.type = netcat # utility in unix that can listenin on a port for traffic
*   a1.sources.k1.bind = localhost # going to bind it to local host and list on port 4444
*   a1.sources.c1.port = 4444

### Describe the sink
*   a1.sinks.k1.type = logger # writes everything to the log from flume

### Use a channel which buffers events in memory
*   a1.channels.c1.type = memory
*   a1.channels.c1.capacity = 1000
*   a1.channels.c1.transactionCapacity = 100

### Bind the source and the sink to the channel
*   a1.sources.r1.channels = c1
*   a1.sink.k1.channel = c1






Time to kick of flume

*   cd /usr/hdp/current/flume-server/
*   bin/flume-ng agent --conf conf --conf-file ~/example.conf --name a1 -Dflume.root.logger=INFO, console #this means that every single logger and information that comes through is going to be print out.



## Set up Flumes to monitor directory and store its data into HDFS

### A single-node Flume configuration(setting up a flow)

Name the componenets on this agent
*   a1.sources = r1
*   a1.sinks = k1
*   a1.channels = c1

Describe and configure the source
*   a.sources.r1.type = spoolDir # list for file and pushes them to an HDFS Sink keeping tract of the time
*   a.sources.r1.spoolDir = /home/maria_dev/spool
*   a.sources.r1.fileHeader = true
*   a.sources.r1.interceptors = timestampInterceptor
*   a.sources.r1timestampIntercpetor.type = timestamp

Describe the sink
*   a1.sinks.k1.type = hdfs # writes everything to hdfs from flume
*   a1.sink.k1.hdfs.path = /user/maria_dev/flume/%y-%m-%d/%H%M/%S
*   a1.sink.k1.hdfs.filePrefix = events-
*   a1.sink.k1.hdfs.round = true
*   a1.sink.k1.hdfs.roundValue=10
*   a1.sink.k1.hdfs.roundUnit=minute

Use a channel which buffers events in memory
*   a1.channels.c1.type = memory
*   a1.channels.c1.capacity = 1000
*   a1.channels.c1.transactionCapacity = 100

Bind the source and the sink to the channel
*   a1.sources.r1.channels = c1
*   a1.sink.k1.channel = c1


### Since i have indicated using the spoolDir, I need to create it.
*   mkdir spool
*   then setup folder on HDFS through ambari

Time to kick of flume

*   cd /usr/hdp/current/flume-server/
*   bin/flume-ng agent --conf conf --conf-file ~/flumelogs.conf --name a1 -Dflume.root.logger=INFO, console #this means that every single logger and information that comes through is going to be print out.

## Analuse web logs published with Flume using Spark Stream
 
 I need to do the config file:
 A single-node Flume configuration(setting up a flow)
Name the componenets on this agent

*   a1.sources = r1
*   a1.sinks = k1
*   a1.channels = c1

Describe and configure the source
*   a1.sources.r1.type = spoolDir # list for file and pushes them to an HDFS Sink keeping tract of the time
*   a1.sources.r1.spoolDir = /home/maria_dev/spool
*   a1.sources.r1.fileHeader = true
*   a1.sources.r1.interceptors = timestampInterceptor
*   a1.sources.r1timestampIntercpetor.type = timestamp

Describe the sink
*   a1.sinks.k1.type = avro
*   a1.sink.k1.hostname = localhost
*   a1.sink.k1.port = 9091

Use a channel which buffers events in memory
*   a1.channels.c1.type = memory
*   a1.channels.c1.capacity = 1000
*   a1.channels.c1.transactionCapacity = 100

Bind the source and the sink to the channel
*   a1.sources.r1.channels = c1
*   a1.sink.k1.channel = c1


In [None]:
# Spark Streaming script

import re

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

parts = [
    r'(?P<host>\S+)',                   # host %h
    r'\S+',                             # indent %l (unused)
    r'(?P<user>\S+)',                   # user %u
    r'\[(?P<time>.+)\]',                # time %t
    r'"(?P<request>.+)"',               # request "%r"
    r'(?P<status>[0-9]+)',              # status %>s
    r'(?P<size>\S+)',                   # size %b (careful, can be '-')
    r'"(?P<referer>.*)"',               # referer "%{Referer}i"
    r'"(?P<agent>.*)"',                 # user agent "%{User-agent}i"
]
pattern = re.compile(r'\s+'.join(parts)+r'\s*\Z')

def extractURLRequest(line):
    exp = pattern.match(line)
    if exp:
        request = exp.groupdict()["request"]
        if request:
           requestFields = request.split()
           if (len(requestFields) > 1):
                return requestFields[1]


if __name__ == "__main__":

    sc = SparkContext(appName="StreamingFlumeLogAggregator")
    sc.setLogLevel("ERROR")
    ssc = StreamingContext(sc, 1)

    flumeStream = FlumeUtils.createStream(ssc, "localhost", 9092)

    lines = flumeStream.map(lambda x: x[1])
    urls = lines.map(extractURLRequest)

    # Reduce by URL over a 5-minute window sliding every second
    urlCounts = urls.map(lambda x: (x, 1)).reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y : x - y, 300, 1)

    # Sort and print the results
    sortedResults = urlCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False))
    sortedResults.pprint()

    ssc.checkpoint("/home/maria_dev/checkpoint")
    ssc.start()
    ssc.awaitTermination()
pass

In [None]:
mkdir checkpoint
spark-submit --packages org.apache.spark:spark-streaming-flume_2.11:2.0.0 sparkFlume.py #this would kick off the job
pass

In [None]:
# kick of flume
*   cd /usr/hdp/current/flume-server/
*   bin/flume-ng agent --conf conf --conf-file ~/sparkstreamingflume.conf --name a1
pass

In [None]:
#starting up another window to put something in there
wget http://media.sundog-soft.com/hadoop/access_log.txt
# by runningn this flume will pick it up and show it on the window
pass

### Aggregating http request with a five minute window and with a slide interval of 5 Seconds


In [None]:
import re

from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtils

parts = [
    r'(?P<host>\S+)',                   # host %h
    r'\S+',                             # indent %l (unused)
    r'(?P<user>\S+)',                   # user %u
    r'\[(?P<time>.+)\]',                # time %t
    r'"(?P<request>.+)"',               # request "%r"
    r'(?P<status>[0-9]+)',              # status %>s
    r'(?P<size>\S+)',                   # size %b (careful, can be '-')
    r'"(?P<referer>.*)"',               # referer "%{Referer}i"
    r'"(?P<agent>.*)"',                 # user agent "%{User-agent}i"
]
pattern = re.compile(r'\s+'.join(parts)+r'\s*\Z')

def extractURLRequest(line):
    exp = pattern.match(line)
    if exp:
        status = exp.groupdict()["status"]
        if status:
          return status


if __name__ == "__main__":

    sc = SparkContext(appName="StreamingFlumeLogAggregator")
    sc.setLogLevel("ERROR")
    ssc = StreamingContext(sc, 5)

    flumeStream = FlumeUtils.createStream(ssc, "localhost", 9092)

    lines = flumeStream.map(lambda x: x[1])
    urls = lines.map(extractURLRequest)

    # Reduce by http over a 5-minute window sliding every second
    statusCounts = status.map(lambda x: (x, 1)).reduceByKeyAndWindow(lambda x, y: x + y, lambda x, y : x - y, 300, 1)

    # Sort and print the results
    sortedResults = urlCounts.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False))
    sortedResults.pprint()

    ssc.checkpoint("/home/maria_dev/checkpoint")
    ssc.start()
    ssc.awaitTermination()
pass

## Apache Storm
*   Another framework for processing continuos streams of data on a cluster
  *   Can run on top of Yarn(like Spark)
*   Works on individual events, not micro-batch(like spark stream does)
  *   if you need sub-second latency Storm is appropriate.
### Storm terminology
*   A stream consists of tuples that flow through
*   Spouts that are sources of stream data(kafka, twitter, etc)
*   Bolts that process stream data as it is recieved
  *   Transform, aggregate, write to database/HDFS
* A topology is a graph of spouts and bolts that process your stream

## Flink
*   German for quick and nimble
*   Another stream processing engine - most similar to storm
*   Can run on standalne cluster, or on top of Yarn or Mesos
*   Highly scalable(1000's of node)
*   Fault-Tolerant
  *   Can survive failures while guaranteeing exactly-once processing
  *   Uses "state snapshots" to achieve this.
* Up and coming quickly

# Designing Real-World System

## Working Backwards
*   Start from the end user's needs, not from where your data is coming from.
  *   Sometimes you may need to meet in the middle.
*   What sort of access pattern do you expect from the user?
  *   Analytical queries that span large data ranges?
  *   Huge amount of small transactions for very specific rows of data?
  *   Both?
*   What avalialability do these end users demand?
*   What consistency do these end users demand?  

## Thinking about Requirements
*   Just  how Big is your Big Data?
  *   DO you really need a cluster?
*   How much internal infrastructure and experties is available?
  *   Should you use AWS or something similar?
  *   DO systems you already know fit the bill?
*   What about data retention?
  *   Do you need to keep data around forever, for auditing?
  *   Or do you need o purge it for privacy?
*   WHat about security?
  *   Check with legal
*   Latency
  *   How quickly do end users need to get a responce?
    *   Millisecond? Then something like Cassandra or HBase will be nedded.
*   Timelines
  *   Can quiries be based upon day-old-data, minute-old?
    *   Oozie schedule jobs in Hive, Pig, Spark, etc may cut it.
  *   Or it must be near-real-time
    *   Use Spark Streaming /Storm /Flink with kafka or flume

## Judicious Future-Proofing
*   Once you have decided where to store your data, moving it later on will be difficult
  *   Think carefully before chooseing proprietry solutions or cloud-based solution.
*   WIll business analyst want you data in addition to end users)or vice versa)

## Cheat to win
*   Does your organisation have existing components you can use?
  *   Don't build a datawarehouse if you already have one.
  *   Rebuild exiting technology always has negative business value.
*   What's the least amount of infrastructure you need to build?
  *   Import existing data with scoop, etc if you can
  *   If relaxing a requirement saves time and money, atleast ask.