# Option 1 : Use HDF CLI (from the command line or python)
### Copy data To and From the local file system and work from there
#### **Applicable to: "small and medium" datasets ; ie : small enough to be easily managed/processed locally.**  
NOTE: To be used in the context of sensitive data and/or data that requiere strong governance as it creates a break in the data chain of custody / security.  
This is not a best practice in terms of Data Management/Governance.

### CML data storage

There a 2 types of storage in CML  
- **Local Storage** : linked to the projet  
  => stores scripts and (optinally) small datasets
- **Shared Storage** : This is the object store (S3 for AWS / ADLS for AZURE) that is linked to the Environment that CML is attached to.   
  => default storage for datasets  
  
It is good practice to declare the default env bucket linked to a CML workspace in a *global* environment variable (Admin -> Engine -> Env Variable)
In this case I used the `$STORAGE` env variable.
Data is stored by default in the `/$STORAGE/datalake` subfolder for example:  `s3a://demo-aws-1/datalake/`

In [1]:
import os
ENV_BUCKET="s3a://demo-aws-2"

try : 
  DL_s3bucket=os.environ["STORAGE"]+"/datalake/"
except KeyError: 
  DL_s3bucket=ENV_BUCKET
  os.environ["STORAGE"]=ENV_BUCKET+"/datalake/"

In [None]:
%%bash
## Cleanup - delete file if exits 
## Note : $STORAGE env variable indicates the root of the default env bucket linked to a CML workspace 
hdfs dfs -rm -f $STORAGE/datalake/tmp/1988.csv.bz2
hdfs dfs -rm -f $STORAGE/datalake/tmp/airports.csv
hdfs dfs -rm -f $STORAGE/datalake/tmp/carriers_python.csv
hdfs dfs -rm -f $STORAGE/datalake/tmp/airports_pipe.csv

## 1. Copy data TO HDFS from local file system
### Using the command line (works in workbench as well)

In [2]:
%%bash
### Copy to HDFS - using HDFS client - from Bash
# NOTE: Only functional from a Bash script or an interactive session 
hdfs dfs -copyFromLocal -f /home/cdsw/airlines/airports/airports.csv $STORAGE/datalake/tmp/
hdfs dfs -ls $STORAGE/datalake/tmp

Found 7 items
drwxrwxrwx   - mlamairesse mlamairesse          0 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp/airlines
-rw-rw-rw-   1 mlamairesse mlamairesse     244438 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp/airports.csv
-rw-rw-rw-   1 mlamairesse mlamairesse      43758 2020-04-16 13:52 s3a://demo-aws-2/datalake/tmp/carriers.csv
drwxrwxrwx   - mlamairesse mlamairesse          0 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp/models
drwxrwxrwx   - mlamairesse mlamairesse          0 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp/wine_pred
drwxrwxrwx   - mlamairesse mlamairesse          0 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp/wine_pred_hive
drwxrwxrwx   - mlamairesse mlamairesse          0 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp/wine_predicted.parquet


20/04/16 15:14:23 WARN impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
20/04/16 15:14:23 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
20/04/16 15:14:23 INFO impl.MetricsSystemImpl: s3a-file-system metrics system started
Apr 16, 2020 3:14:23 PM org.apache.knox.gateway.shell.KnoxSession createClient
INFO: Using default JAAS configuration
20/04/16 15:14:25 INFO impl.MetricsSystemImpl: Stopping s3a-file-system metrics system...
20/04/16 15:14:25 INFO impl.MetricsSystemImpl: s3a-file-system metrics system stopped.
20/04/16 15:14:25 INFO impl.MetricsSystemImpl: s3a-file-system metrics system shutdown complete.
20/04/16 15:14:26 WARN impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
20/04/16 15:14:26 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
20/04/16 15:14:26 INFO impl.Metr

### Using Python - Write data to the HDFS CLI (via subprocess)¶

In [3]:
### local and HDFS Paths 
import os 
shared_root=os.environ["STORAGE"]+"/datalake/"
local_path="/home/cdsw/airlines/carriers/carriers.csv"
shared_path=shared_root+"tmp/carriers_python.csv"
print(shared_path)

s3a://demo-aws-2/datalake/tmp/carriers_python.csv


In [4]:
from subprocess import Popen, PIPE
import sys

def hdfs_write(local_path,hdfs_path):
    ### Copy to HDFS - Python (using subprocess)
    from subprocess import Popen, PIPE
    put = Popen(["hadoop","fs","-put","-f", local_path, hdfs_path], stdin=PIPE,stdout=PIPE,stderr=PIPE)
    stdout, stderr = put.communicate()
    
    ## Error handling
    if put.returncode != 0: 
        raise IOError(stderr)

hdfs_write(local_path,shared_path)

#Show hdfs path
!hdfs dfs -ls $STORAGE/datalake/tmp

20/04/16 15:14:32 WARN impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
20/04/16 15:14:32 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
20/04/16 15:14:32 INFO impl.MetricsSystemImpl: s3a-file-system metrics system started
Apr 16, 2020 3:14:32 PM org.apache.knox.gateway.shell.KnoxSession createClient
INFO: Using default JAAS configuration
Found 8 items
drwxrwxrwx   - mlamairesse mlamairesse          0 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp/airlines
-rw-rw-rw-   1 mlamairesse mlamairesse     244438 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp/airports.csv
-rw-rw-rw-   1 mlamairesse mlamairesse      43758 2020-04-16 13:52 s3a://demo-aws-2/datalake/tmp/carriers.csv
-rw-rw-rw-   1 mlamairesse mlamairesse      43758 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp/carriers_python.csv
drwxrwxrwx   - mlamairesse mlamairesse          0 2020-04-16 15:14 s3a://demo-aws-2/datalake/t

**Tip:** Use a pipe to download files directly to HDFS (command line)

In [5]:
%%bash
## NOTE: Only functional from a Bash script or an interactive session 
export DOWNLOAD_LINK='http://stat-computing.org/dataexpo/2009/airports.csv'
curl $DOWNLOAD_LINK | hadoop fs -put - $STORAGE/datalake/tmp/airports_pipe.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     020/04/16 15:14:34 WARN impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
20/04/16 15:14:34 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
20/04/16 15:14:34 INFO impl.MetricsSystemImpl: s3a-file-system metrics system started
Apr 16, 2020 3:14:34 PM org.apache.knox.gateway.shell.KnoxSession createClient
INFO: Using default JAAS configuration
 31  238k   31 75896    0     0  30034      0  0:00:08  0:00:02  0:00:06 30022100  238k  100  238k    0     0  96730      0  0:00:02  0:00:02 --:--:-- 96692
20/04/16 15:14:37 INFO impl.MetricsSystemImpl: Stopping s3a-file-system metr

In [6]:
!hdfs dfs -ls $STORAGE/datalake/tmp/

20/04/16 15:14:38 WARN impl.MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
20/04/16 15:14:38 INFO impl.MetricsSystemImpl: Scheduled Metric snapshot period at 10 second(s).
20/04/16 15:14:38 INFO impl.MetricsSystemImpl: s3a-file-system metrics system started
Apr 16, 2020 3:14:38 PM org.apache.knox.gateway.shell.KnoxSession createClient
INFO: Using default JAAS configuration
Found 9 items
drwxrwxrwx   - mlamairesse mlamairesse          0 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp/airlines
-rw-rw-rw-   1 mlamairesse mlamairesse     244438 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp/airports.csv
-rw-rw-rw-   1 mlamairesse mlamairesse     244438 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp/airports_pipe.csv
-rw-rw-rw-   1 mlamairesse mlamairesse      43758 2020-04-16 13:52 s3a://demo-aws-2/datalake/tmp/carriers.csv
-rw-rw-rw-   1 mlamairesse mlamairesse      43758 2020-04-16 15:14 s3a://demo-aws-2/datalake/tmp

# Reading / Writing files from S3 in CML

##  Option 1 - USE  SPARK
### **Applicable to:**  All datasets and large ones in particular <br> 
IDBroker integration allows direct access to the S3 buckets associated with the environement

### Use Case 1. Reading data from an S3 bucket associated with the Environement 

### Start the spark session
Custom session configuration can be defined either in the session parameters as below OR
inside a `spark-defaults.conf` file stored at the root of the project (in which case the configs become project wide)  
**NOTE** Buckets to be accessed must be indicated in the Spark configuration `spark.yarn.access.hadoopFileSystems`  
Below an example to access the default bucket

In [7]:
import os
shared_root=os.environ["STORAGE"]+"/datalake/"
print("Bucket accessed: "+shared_root)

from pyspark.sql import SparkSession

spark = SparkSession\
  .builder\
  .appName('Airline')\
  .config("spark.executor.memory","2g")\
  .config("spark.executor.cores","2")\
  .config("spark.executor.instances","3")\
  .config("spark.yarn.access.hadoopFileSystems",shared_root)\
  .getOrCreate()

Bucket accessed: s3a://demo-aws-2/datalake/


In [8]:
## Adding a link to the Spark UI for demo purposes
## Also available in the session tab
from IPython.core.display import HTML
import os
HTML('<a href="http://spark-{}.{}" target="_blank" >Spark UI</a>'.\
     format(os.getenv("CDSW_ENGINE_ID"),os.getenv("CDSW_DOMAIN")))

### Read Data - CSV file stored on HDFS 

In [9]:
import os
shared_root=os.environ["STORAGE"]+"/datalake/"
shared_path=shared_root+'tmp/airports.csv' #bucket location
print("location read: "+shared_path)

location read: s3a://demo-aws-2/datalake/tmp/airports.csv


In [10]:
airports_df = spark.read.csv(
    path=shared_path,
    header=True,
    sep=',',
    inferSchema=True,
    nullValue=None
)
airports_df.printSchema()

root
 |-- iata: string (nullable = true)
 |-- airport: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)



**Note** : in the above example, I'm infering the schema from the file. <br>
It's actually good practice to set the schema to prevent erroneous type casting

In [11]:
from pyspark.sql.types import *

schema = StructType([StructField("iata", StringType(), True),
                     StructField("airport", StringType(), True),
                     StructField("city", StringType(), True),
                     StructField("state", StringType(), True),
                     StructField("country", StringType(), True),
                     StructField("lat",  DoubleType(), True),
                     StructField("long",  DoubleType(), True)
                    ])

airports_df = spark.read.csv(
    path=shared_path,
    schema=schema,
    header=True,
    sep=',',
    nullValue=None
).cache()
airports_df.show(5)

+----+--------------------+----------------+-----+-------+-----------+------------+
|iata|             airport|            city|state|country|        lat|        long|
+----+--------------------+----------------+-----+-------+-----------+------------+
| 00M|            Thigpen |     Bay Springs|   MS|    USA|31.95376472|-89.23450472|
| 00R|Livingston Municipal|      Livingston|   TX|    USA|30.68586111|-95.01792778|
| 00V|         Meadow Lake|Colorado Springs|   CO|    USA|38.94574889|-104.5698933|
| 01G|        Perry-Warsaw|           Perry|   NY|    USA|42.74134667|-78.05208056|
| 01J|    Hilliard Airpark|        Hilliard|   FL|    USA| 30.6880125|-81.90594389|
+----+--------------------+----------------+-----+-------+-----------+------------+
only showing top 5 rows



### (optional) Transform data to Pandas Dataframe
#### Once converted **ALL DATA will be brought locally** and distributed processing ends 
* **Applicable to : SMALL to MEDIUM size datasets** - ie : datasets that can be easily managed/processed locally
* When working with **LARGE datasets** :  **data should be sampled** before bringing it locally

> **Good Practice**:  Spark context should be stopped `spark.stop()` to release cluster ressources once data is copied

In [12]:
#without sampling
import pandas 
airport_pandas_df = airports_df.toPandas()
airport_pandas_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3376 entries, 0 to 3375
Data columns (total 7 columns):
iata       3376 non-null object
airport    3376 non-null object
city       3376 non-null object
state      3376 non-null object
country    3376 non-null object
lat        3376 non-null float64
long       3376 non-null float64
dtypes: float64(2), object(5)
memory usage: 184.7+ KB


In [13]:
#with sampling
sample_pandas_df = airports_df.sample(1/3,seed=30).toPandas()
sample_pandas_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1111 entries, 0 to 1110
Data columns (total 7 columns):
iata       1111 non-null object
airport    1111 non-null object
city       1111 non-null object
state      1111 non-null object
country    1111 non-null object
lat        1111 non-null float64
long       1111 non-null float64
dtypes: float64(2), object(5)
memory usage: 60.8+ KB


### 2. Write data from PANDAS to HDFS - Using Spark

#### Read the data using Pandas

In [14]:
## read from pandas
import pandas as pd
airlines_pd_df = pd.read_csv("/home/cdsw/airlines/airports/airports.csv",sep=',', delimiter=None, header='infer')
airlines_pd_df.sort_values(by=['state','airport'],inplace=True) # ordering to keep same visulisation order 
airlines_pd_df.head()

Unnamed: 0,iata,airport,city,state,country,lat,long
776,ADK,Adak,Adak,AK,USA,51.877964,-176.646031
818,AKK,Akhiok,Akhiok,AK,USA,56.938691,-154.182556
3363,Z13,Akiachak,Akiachak,AK,USA,60.904532,-161.42091
817,AKI,Akiak,Akiak,AK,USA,60.904812,-161.227019
1994,KQA,Akutan SPB,Akutan,AK,USA,54.132467,-165.785311


### Transform Pandas DataFrame to Spark DataFrame
With spark 2.3 and up, integration with Pandas has been reinforced notably with the use of Arrow for faster data transfers [https://issues.apache.org/jira/browse/SPARK-20791]

In [15]:
# (optional) Enable Arrow-based optimised columnar data transfers ; Note : still marked as experimental
# spark.conf.set("spark.sql.execution.arrow.enabled", "true") # Note : Compatible only with pyarrow 0.8.0 
# https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html#ensure-pyarrow-installed

#(optional) good practice to define schema to prevent any type casting errors
from pyspark.sql.types import *

schema = StructType([StructField("iata", StringType(), True),
                     StructField("airport", StringType(), True),
                     StructField("city", StringType(), True),
                     StructField("state", StringType(), True),
                     StructField("country", StringType(), True),
                     StructField("lat",  DoubleType(), True),
                     StructField("long",  DoubleType(), True)
                    ])

spark_df=spark.createDataFrame(airlines_pd_df,schema=schema)
spark_df.orderBy(['state','airport']).show(5) # ordering to keep same visulisation order 

+----+----------+--------+-----+-------+-----------+-------------------+
|iata|   airport|    city|state|country|        lat|               long|
+----+----------+--------+-----+-------+-----------+-------------------+
| ADK|      Adak|    Adak|   AK|    USA|51.87796389|       -176.6460306|
| AKK|    Akhiok|  Akhiok|   AK|    USA|56.93869083|       -154.1825556|
| Z13|  Akiachak|Akiachak|   AK|    USA|60.90453167|-161.42091000000002|
| AKI|     Akiak|   Akiak|   AK|    USA|60.90481194|       -161.2270189|
| KQA|Akutan SPB|  Akutan|   AK|    USA|54.13246694|       -165.7853111|
+----+----------+--------+-----+-------+-----------+-------------------+
only showing top 5 rows



### Write to HDFS - using Spark

In [16]:
## It's good practice to restructure data before writing to HDFS : Spark write a file by partition. 
## this can lead to lots of small files which is counterproductive both for read and write. 
## Re-organize data using the "coalesce" function to define the number of files to be saved
import os
shared_root=os.environ["STORAGE"]+"/datalake/"
location = shared_root+'/tmp/airlines/spark_write'
print("location read: "+location)

spark_df.coalesce(2).write.parquet(location, mode='overwrite')

test_file_df = spark.read.parquet(location)
test_file_df.printSchema()

location read: s3a://demo-aws-2/datalake//tmp/airlines/spark_write
root
 |-- iata: string (nullable = true)
 |-- airport: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)



### Write Data to Hive - using Spark
Spark to hive integration makes it very easy to interact with the cluster. 

In [17]:
# Note : Ordering on write can help optimise reads later on. 
spark_df.orderBy(['state','airport']).coalesce(2)\
    .write.format('parquet').mode("overwrite")\
    .saveAsTable('flights.airports_new')

### 3. Read Data from Hive 
All hive configurations are already injected into spark.  Therefore Hive can be called directly using a spark sql context.

In [18]:
sql_statement = '''show tables in flights'''
spark.sql(sql_statement).show()

+--------+------------+-----------+
|database|   tableName|isTemporary|
+--------+------------+-----------+
| flights|    airports|      false|
| flights|airports_new|      false|
| flights|    carriers|      false|
| flights| flights_raw|      false|
+--------+------------+-----------+



In [19]:
#read table
sql_statement = '''select * from flights.airports where state = "AK" '''
airports_df = spark.sql(sql_statement)
airports_df.show(10)

+----+----------+--------------+-----+-------+-----------+------------+
|iata|   airport|          city|state|country|        lat|        long|
+----+----------+--------------+-----+-------+-----------+------------+
| ADK|      Adak|          Adak|   AK|    USA|51.87796389|-176.6460306|
| AKK|    Akhiok|        Akhiok|   AK|    USA|56.93869083|-154.1825556|
| Z13|  Akiachak|      Akiachak|   AK|    USA|60.90453167|  -161.42091|
| AKI|     Akiak|         Akiak|   AK|    USA|60.90481194|-161.2270189|
| KQA|Akutan SPB|        Akutan|   AK|    USA|54.13246694|-165.7853111|
| AUK|  Alakanuk|      Alakanuk|   AK|    USA|62.68004417|-164.6599253|
| 5A8| Aleknagik|     Aleknagik|   AK|    USA|59.28256167|-158.6176725|
| 6A8| Allakaket|     Allakaket|   AK|    USA|66.55194444|-152.6222222|
| BIG| Allen AAF|Delta Junction|   AK|    USA|63.99454722|-145.7216417|
| AFM|    Ambler|        Ambler|   AK|    USA|67.10610472|-157.8536203|
+----+----------+--------------+-----+-------+-----------+------

In [20]:
#(OPTIONAL) convert to pandas 
airlines_pd_df = airports_df.toPandas()
airlines_pd_df.head()

Unnamed: 0,iata,airport,city,state,country,lat,long
0,ADK,Adak,Adak,AK,USA,51.877964,-176.646031
1,AKK,Akhiok,Akhiok,AK,USA,56.938691,-154.182556
2,Z13,Akiachak,Akiachak,AK,USA,60.904532,-161.42091
3,AKI,Akiak,Akiak,AK,USA,60.904812,-161.227019
4,KQA,Akutan SPB,Akutan,AK,USA,54.132467,-165.785311


In [21]:
spark.stop() ## Release spark ressources

#### ***NOTE:*** Pandas Dataframe is still available

In [22]:
airport_pandas_df.head()

Unnamed: 0,iata,airport,city,state,country,lat,long
0,00M,Thigpen,Bay Springs,MS,USA,31.953765,-89.234505
1,00R,Livingston Municipal,Livingston,TX,USA,30.685861,-95.017928
2,00V,Meadow Lake,Colorado Springs,CO,USA,38.945749,-104.569893
3,01G,Perry-Warsaw,Perry,NY,USA,42.741347,-78.052081
4,01J,Hilliard Airpark,Hilliard,FL,USA,30.688012,-81.905944


## OPTION 2 - Read_Write Directly from Pandas ( using AWS secret and key ) 


### Pandas can read directly from S3
Dependencies : s3fs library
> Note : AWS CLI must be configured ahead of time with AWS key and secret

In [None]:
!pip3 install s3fs==0.4.0

In [None]:
%%bash
# Note : best practice is not to set AWS access and secret in config files or via command line, not in script

export AWS_ACCESS_KEY_ID=XXXXX
export AWS_SECRET_ACCESS_KEY=XXXXX
export AWS_DEFAULT_REGION=us-west-2

In [None]:
import os
shared_root=os.environ["STORAGE"]+"/datalake/"
shared_path=shared_root+'tmp/airports.csv' #bucket location
print("location read: "+shared_path)

In [None]:
airlines_pd_df = pd.read_csv(shared_path,sep=';', delimiter=None, header='infer')
airlines_pd_df.head()