# Working with Apache Hudi Deltastreamer

Working with Apache Hudi Deltastreamer
HoodieDeltaStreamer utility is part of hudi-utilities-bundle that provides a way to ingest data from sources such as DFS or Kafka.

In this notebook, you will learn to use DeltaStreamer Utility to bulk insert data into a Hudi Dataset as a Copy on Write(CoW) table (and Merge on Read [MOR] table) , apply change data capture (CDC) to the tables and query the Hudi dataset using Hive. 

We will run queries in hudi-cli to verify the tables and subsequent updates are incorporated into our datalake on Amazon S3

Let's get started !

## Generate Data

### Install Python Faker 

In [None]:
!pip install Faker

### Fake Profile Generator

Fake profile generator uses Python's Faker [https://faker.readthedocs.io/en/master/index.html] library. Let's define a method to generate a number of random person profiles.

In [None]:
import os
import json
import random
import boto3
import io
from faker import Faker
from faker.providers import date_time, credit_card
from json import dumps


# Intialize Faker library and S3 client
fake = Faker() 
fake.add_provider(date_time)
fake.add_provider(credit_card)

s3 = boto3.resource('s3')

# Write the fake profile data to a S3 bucket
# Replace with your own bucket
s3_bucket = "mybucket"
s3_load_prefix = 'hudi-ds/inputdata/'
s3_update_prefix = 'hudi-ds/updates/'

# Number of records in each file and number of files
# Adjust per your need - this produces 40MB files
#num_records = 150000
#num_files = 50

num_records = 7
num_files = 3

def generate_bulk_data():
    '''
    Generates bulk profile data
    '''
    # Generate number of files equivalent to num_files
    for i in range (num_files):
        fake_profile_data = fake_profile_generator(num_records, fake)
        fakeIO = StringIO()
        filename = 'profile_' + str(i + 1) + '.json'
        s3key = s3_load_prefix + filename 

        fakeIO.write(str(''.join(dumps_lines(fake_profile_data))))

        s3object = s3.Object(s3_bucket, s3key)
        s3object.put(Body=(bytes(fakeIO.getvalue().encode('UTF-8'))))
        fakeIO.close()

def generate_updates():
    '''
    Generates updates for the profiles
    '''
    #
    # We will make updates to records in randomly picked files
    #
    random_file_list = []
    
    for i in range (1, num_files):
        random_file_list.append('profile_' + str(i) + '.json')
    
    for f in random_file_list:
        print(f)
        s3key = s3_load_prefix + f
        obj = s3.Object(s3_bucket, s3key)
        profile_data = obj.get()['Body'].read().decode('utf-8')
        
        #s3_profile_list = json.loads(profile_data)
        stringIO_data = io.StringIO(profile_data)
        data = stringIO_data.readlines()

        #Its time to use json module now.
        json_data = list(map(json.loads, data))

        fakeIO = StringIO()
        s3key = s3_update_prefix + f
        fake_profile_data = []
        
        for rec in json_data:
            # Let's generate a new address
            print ("old address: " + rec['street_address'])
            rec['street_address'] = fake.address()
            print ("new address: " + rec['street_address'])
            fake_profile_data.append(rec)
            
        fakeIO.write(str(''.join(dumps_lines(fake_profile_data))))
        s3object = s3.Object(s3_bucket, s3key)
        s3object.put(Body=(bytes(fakeIO.getvalue().encode('UTF-8'))))
        fakeIO.close()

def fake_profile_generator(length, fake, new_address=""):
    """
    Generates fake profiles
    """
    for x in range (length):       
        yield {'Name': fake.name(),
               'phone': fake.phone_number(),
               'job': fake.job(),
               'company': fake.company(),
               'ssn': fake.ssn(),
               'street_address': (new_address if new_address else fake.address()),
               'dob': (fake.date_of_birth(tzinfo=None, minimum_age=21, maximum_age=105).isoformat()),
               'email': fake.email(),
               'ts': (fake.date_time_between(start_date='-10y', end_date='now', tzinfo=None).isoformat()),
               'credit_card': fake.credit_card_number(),
               'record_id': fake.pyint(),
               'id': fake.uuid4()}
        
def dumps_lines(objs):
    for obj in objs:
        yield json.dumps(obj, separators=(',',':')) + '\n'   

### Start the data generator

Following code kicks off the fake data generator to produce files each with certain records (configurable) in JSON format. The files are written to a specified S3 bucket.

In [None]:
generate_bulk_data()

## Copy Hudi Libraries on the EMR Cluster and create Hive table

0. For the following steps to work, you should have launched the EMR cluster with appropriate permissions set for **Systems Manager Session Manager** 
1. From the AWS Console, type SSM in the search box and navigate to the **Amazon System Manager console**
2. On the left hand side, select **Session Manager** from **Instances and Nodes** section
3. Click on the start session and you should see two EC2 instances listed 
4. Select instance-id of the **EMR's Master** Node and click on **Start session**
5. From the terminal type the following to change to user *ec2-user*
 
```bash
sh-4.2$ sudo su hadoop
hadoop@ip-10-0-2-73 /]$ cd
[hadoop@ip-10-0-2-73 ~]$ hdfs dfs -mkdir -p /apps/hudi/lib
[hadoop@ip-10-0-2-73 ~]$ hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-spark-bundle.jar /apps/hudi/lib/hudi-spark-bundle.jar
hadoop@ip-10-0-2-73 ~]$ hdfs dfs -copyFromLocal /usr/lib/spark/external/lib/spark-avro.jar /apps/hudi/lib/spark-avro.jar
hadoop@ip-10-0-2-73 ~]$ hdfs dfs -copyFromLocal /usr/lib/hudi/hudi-utilities-bundle.jar /apps/hudi/lib/hudi-utilities-bundle.jar
[hadoop@ip-10-0-2-73 ~]$ hdfs dfs -ls /apps/hudi/lib/
Found 3 items
-rw-r--r--   2 hadoop hadoop   20967361 2020-03-10 04:10 /apps/hudi/lib/hudi-spark-bundle.jar
-rw-r--r--   2 hadoop hadoop   39051878 2020-03-10 04:27 /apps/hudi/lib/hudi-utilities-bundle.jar
-rw-r--r--   2 hadoop hadoop     187458 2020-03-10 04:11 /apps/hudi/lib/spark-avro.jar
```

6. Create an external table from Hive as follows

```
hive> CREATE EXTERNAL TABLE profile_test_cow(Name string,
    > phone string,
    > job string,
    > company string,
    > ssn string,
    > street_address string,
    > dob string,
    > email string,
    > ts string,
    > credit_card string,
    > record_id int,
    > id string)
    > ROW FORMAT SERDE
    > 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
    > STORED AS INPUTFORMAT
    > 'com.uber.hoodie.hadoop.HoodieInputFormat'
    > OUTPUTFORMAT
    > 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
    > LOCATION
    > 's3://<my-bucket>/hudi-ds/output/profile-test-out/';
OK
Time taken: 3.575 seconds
```


## Run DeltaStreamer to write a Copy on Write (COW) table

We will now run the DeltaStreamer utility as an EMR Step to write the above JSON formatted data into a Hudi dataset. To do that, we will need the following:

* Properties file on localfs or dfs, with configurations for Hudi client, schema provider, key generator and data source 
* Schema file for source dataset
* Schema file for target dataset

To run DeltaStreamer

```
aws emr add-steps --cluster-id j-XXXXZOOXXXXX --steps Type=Spark,Name="Deltastreamer COW",ActionOnFailure=CONTINUE,Args=[--class,org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer,hdfs:///apps/hudi/lib/hudi-utilities-bundle.jar,--props,s3://<my-bucket>/config/json-deltastreamer.properties,--storage-type,COPY_ON_WRITE,--source-class,org.apache.hudi.utilities.sources.JsonDFSSource,--source-ordering-field,ts,--target-base-path,s3://<my-bucket>/hudi-ds-output/person-profile-out1,--target-table,person_profile_cow,--schemaprovider-class,org.apache.hudi.utilities.schema.FilebasedSchemaProvider,--op,UPSERT] --region us-east-1

```


Replace the following values in the above command in the text editor

1. --cluster-id with the value you got from previous step
2. For --props value replace xxxx part in hudi-workshop-xxxx with the S3 bucket name 
3. For -- target-base-path value with the S3 bucket name
4. After replacing the values, copy the entire commmand and run it in the next cell
5. If the values are replaced correctly, you should see a step id displayed as the output



In [None]:
! aws emr add-steps --cluster-id j-1QXXXXXXXXX --steps Type=Spark,Name="Deltastreamer Profile COW",ActionOnFailure=CONTINUE,Args=[--class,org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer,hdfs:///apps/hudi/lib/hudi-utilities-bundle.jar,--props,s3://aws-test123/hudi-ds/config/json-deltastreamer-upserts.properties,--table-type,COPY_ON_WRITE,--source-class,org.apache.hudi.utilities.sources.JsonDFSSource,--source-ordering-field,ts,--target-base-path,s3://aws-test123/hudi-ds/output/profile-test-out,--target-table,profile_test_cow,--schemaprovider-class,org.apache.hudi.utilities.schema.FilebasedSchemaProvider,--op,BULK_INSERT] --region us-east-1

## Query the Hudi Dataset

To query the Hudi dataset you can do one of the following

- Navigate to the another sparkmagic notebook and run queries in Spark 
- SSH to the master node (you can also SSM if you launched your cluster with SSM permissions) and run queries using Hive/Presto
- Head to the Hue console on Amazon EMR and run queries

## Run updates

You can run updates now on the fake profile data generated. 

In [None]:
generate_updates()

## Run DeltaStreamer to apply updates

We will now run the Deltastreamer again to run upserts using the updates generated in the previous step.

```

! aws emr add-steps --cluster-id j-XXXXXXX --steps Type=Spark,Name="Deltastreamer Profile Upserts",ActionOnFailure=CONTINUE,Args=[--class,org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer,hdfs:///apps/hudi/lib/hudi-utilities-bundle.jar,--props,s3://<my-bucket>/hudi-ds/config/json-deltastreamer.properties,--table-type,COPY_ON_WRITE,--source-class,org.apache.hudi.utilities.sources.JsonDFSSource,--source-ordering-field,ts,--target-base-path,s3://<my-bucket>/hudi-ds/output/profile-test15-out,--target-table,profile_test15_cow,--schemaprovider-class,org.apache.hudi.utilities.schema.FilebasedSchemaProvider,--op,UPSERT] --region us-east-1

```

In [None]:
! aws emr add-steps --cluster-id j-1XXXXXXXXX --steps Type=Spark,Name="Deltastreamer Profile Upserts",ActionOnFailure=CONTINUE,Args=[--class,org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer,hdfs:///apps/hudi/lib/hudi-utilities-bundle.jar,--props,s3://<my-bucket>/hudi-ds/config/json-deltastreamer-upserts.properties,--table-type,COPY_ON_WRITE,--source-class,org.apache.hudi.utilities.sources.JsonDFSSource,--source-ordering-field,ts,--target-base-path,s3://<my-bucket>/hudi-ds/output/profile-test-out,--target-table,profile_test_cow,--schemaprovider-class,org.apache.hudi.utilities.schema.FilebasedSchemaProvider,--op,UPSERT] --region us-east-1

## Query the updated Hudi Dataset

To query the Hudi dataset you can do one of the following

- Navigate to the another sparkmagic notebook and run queries in Spark 
- SSH to the master node (you can also SSM if you launched your cluster with SSM permissions) and run queries using Hive/Presto
- Head to the Hue console on Amazon EMR and run queries