# Reading / Writing files from HDFS in CDSW

In [81]:
%%bash
## Cleanup - delete file if exits 
hdfs dfs -rm -f /tmp/1988.csv.bz2
hdfs dfs -rm -f /tmp/airports.csv

2020-01-06 17:22:37,111 INFO  [main] fs.TrashPolicyDefault (TrashPolicyDefault.java:moveToTrash(182)) - Moved: 'hdfs://mlamairessse-1.vpc.cloudera.com:8020/tmp/airports.csv' to trash at: hdfs://mlamairessse-1.vpc.cloudera.com:8020/user/systest/.Trash/Current/tmp/airports.csv




## Option 1 : Use HDF CLI 
### Copy files To and From the local file system and work from there
### **Applicable to:** **"small and medium"** datasets ; ie : small enough to be easily managed/processed locally. <br>
**NOTE:** To be used in the context of **sensitive data and/or data that requiere strong governance** as it creates a break in the data chain of custody / security. <br> 
This is not a best practice in terms of Data Management/Governance.

### 1. Copy data TO HDFS from local file system 
#### Command Line

In [87]:
### local and HDFS Paths 
local_path="/home/cdsw/airlines/airports/airports.csv"
hdfs_path="/tmp/"

In [78]:
%%bash
### Copy to HDFS - using HDFS client - from Bash
# NOTE: Only functional from a Bash script or an interactive session 
hdfs dfs -copyFromLocal -f /home/cdsw/airlines/airports/airports.csv /tmp/
hdfs dfs -ls /tmp

Found 6 items
d---------   - hdfs    supergroup          0 2020-01-06 17:18 /tmp/.cloudera_health_monitoring_canary_files
-rw-r--r--   2 systest supergroup     100069 2020-01-06 12:29 /tmp/airport.zip
-rw-r--r--   2 systest supergroup     244438 2020-01-06 17:19 /tmp/airports.csv
-rw-r--r--   2 systest supergroup     100070 2020-01-06 17:01 /tmp/airports.zip
drwxrwxrwx   - hive    supergroup          0 2020-01-05 14:30 /tmp/hive
drwxrwxrwt   - mapred  hadoop              0 2020-01-05 14:30 /tmp/logs




#### Using Python (subprocess)

In [100]:
### Copy to HDFS - Using HDFS Client - From Python ( using subprocess )
from subprocess import Popen, PIPE
import sys

def hdfs_write(local_path,hdfs_path):
    ### Copy to HDFS - Python (using subprocess)
    from subprocess import Popen, PIPE
    put = Popen(["hadoop", "fs", "-put", local_path, hdfs_path], stdin=PIPE,stdout=PIPE,stderr=PIPE)
    stdout, stderr = put.communicate()
    
    ## Error handling
    if put.returncode != 0: 
        raise IOError(stderr)

hdfs_write(local_path,hdfs_path)

#Show hdfs path
!hdfs dfs -ls /tmp

OSError: b"WARNING: log4j.properties is not found. HADOOP_CONF_DIR may be incomplete.\nput: `/tmp/airports.csv': File exists\n"

####  Use a pipeline to download a file directly to HDFS

In [30]:
%%bash
## NOTE: Only functional from a Bash script or an interactive session 
export DOWNLOAD_LINK='https://mlamairesse.s3-eu-west-1.amazonaws.com/Airlines_Dataset/1988.csv.bz2'
curl $DOWNLOAD_LINK | hadoop fs -put - /tmp/1988.csv.bz2

hdfs dfs -ls /tmp

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0 47.2M    0 16360    0     0  20197      0  0:40:50 --:--:--  0:40:50 20172  0 47.2M    0 85614    0     0  44590      0  0:18:30  0:00:01  0:18:29 44567 10 47.2M   10 5176k    0     0  1850k      0  0:00:26  0:00:02  0:00:24 1849k 36 47.2M   36 17.0M    0     0  4592k      0  0:00:10  0:00:03  0:00:07 4592k 61 47.2M   61 29.0M    0     0  6187k      0  0:00:07  0:00:04  0:00:03 6186k 86 47.2M   86 41.0M    0     0  7234k      0  0:00:06  0:00:05  0:00:01 8403k100 47.2M  100 47.2M    0     0  7659k      0  0:00:06  0:00:06 --:--:-- 10.7M


### 2. Copy data FROM HDFS to local file sytem 
#### Command line 

In [None]:
## Copy the file from HDFS
!hdfs dfs -get airlines/airports/airports.csv /home/cdsw/airlines/airports/

#### Using Python (subprocess) 

In [116]:
from subprocess import Popen, PIPE
import sys

def hdfs_read(hdfs_path):
    proc = Popen(['hadoop', 'fs', '-text' , hdfs_path], stdout=PIPE, stderr=PIPE, universal_newlines=False)
    stdout, stderr = proc.communicate()
    
    if proc.returncode != 0:
        if 'No such file or directory' in stderr.decode('utf-8'):
            raise FileNotFoundError('No such file or directory: {}'.format(local_path))
        else : 
            raise IOError(stderr)
    
    return stdout
    #Return a "bytes" object ; 

test = hdfs_read("airlines/airports/airports.csv").decode('utf-8') #decode as a string
test[0:10] # get first 10 char

'"iata","ai'

#### Using Python(subprocess) - directly from Pandas 

In [110]:
### NOTE: Can be read directly from Pandas with a bit of transformation 
### Pandas only accepts file, path or StringIO object 
### https://pandas.pydata.org/pandas-docs/stable/user_guide/io.html
from io import StringIO
airlines_pd_df = pd.read_csv(StringIO(hdfs_read("airlines/airports/airports.csv").decode('utf-8')),
                             sep=',', delimiter=None, header='infer')
airlines_pd_df.head()



Unnamed: 0,iata,airport,city,state,country,lat,long
0,00M,Thigpen,Bay Springs,MS,USA,31.953765,-89.234505
1,00R,Livingston Municipal,Livingston,TX,USA,30.685861,-95.017928
2,00V,Meadow Lake,Colorado Springs,CO,USA,38.945749,-104.569893
3,01G,Perry-Warsaw,Perry,NY,USA,42.741347,-78.052081
4,01J,Hilliard Airpark,Hilliard,FL,USA,30.688012,-81.905944


### 3. Read Data FROM HDFS using the console ( get a feel for what data looks like ) 
**NOTE:** using `-text` rather than `-cat` allows reading from compressed files (zip,gz,bz2,...)

In [6]:
!hdfs dfs -text airlines/airports/airports.csv | head -n 5

"iata","airport","city","state","country","lat","long"
"00M","Thigpen ","Bay Springs","MS","USA",31.95376472,-89.23450472
"00R","Livingston Municipal","Livingston","TX","USA",30.68586111,-95.01792778
"00V","Meadow Lake","Colorado Springs","CO","USA",38.94574889,-104.5698933
"01G","Perry-Warsaw","Perry","NY","USA",42.74134667,-78.05208056
text: Unable to write to output stream.


## Option 2 - Read/Write using spark
### **Applicable to:**  All datasets and large ones in particular <br> 
Using spark, allows us to use the data without having to copy first. It's much cleaner in terms of chain of custody <br>
**NOTE:** For large dataset, it also allows us to do filtering, pre-processing and filtering in a distributed manner which is much more efficient. 

### 1. Reading data from HDFS

### Start the spark session
Custom session configuration can be defined either in the session parameters as below OR
inside a `spark-defaults.conf` file stored at the root of the project (in which case the configs become project wide)

In [14]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .master("yarn")\
    .appName("Airline")\
    .config("spark.executor.memory","2g")\
    .config("spark.executor.cores","2")\
    .config("spark.driver.memory","2g")\
    .config("spark.executor.instances","2")\
    .getOrCreate()

In [15]:
#Adding a link to the Spark UI for demo purposes
from IPython.core.display import HTML
import os
HTML('<a href="http://spark-{}.{}" target="_blank" >Spark UI</a>'.\
     format(os.getenv("CDSW_ENGINE_ID"),os.getenv("CDSW_DOMAIN")))

### Read the Data - csv file stored on HDFS 

In [16]:
path='airlines/airports/airports.csv' #HDFS location

airports_df = spark.read.csv(
    path=path,
    header=True,
    sep=',',
    inferSchema=True,
    nullValue=None
)
airports_df.printSchema()

root
 |-- iata: string (nullable = true)
 |-- airport: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- lat: double (nullable = true)
 |-- long: double (nullable = true)



**Note** : in the above example, I'm infering the schema from the file. <br>
It's actually good practice to set the schema to prevent erroneous type casting

In [17]:
from pyspark.sql.types import *

path='airlines/airports/airports.csv' #HDFS location
schema = StructType([StructField("iata", StringType(), True),
                     StructField("airport", StringType(), True),
                     StructField("city", StringType(), True),
                     StructField("state", StringType(), True),
                     StructField("country", StringType(), True),
                     StructField("lat",  DoubleType(), True),
                     StructField("long",  DoubleType(), True)
                    ])

airports_df = spark.read.csv(
    path=path,
    schema=schema,
    header=True,
    sep=',',
    nullValue=None
).cache()
airports_df.show(5)

+----+--------------------+----------------+-----+-------+-----------+------------+
|iata|             airport|            city|state|country|        lat|        long|
+----+--------------------+----------------+-----+-------+-----------+------------+
| 00M|            Thigpen |     Bay Springs|   MS|    USA|31.95376472|-89.23450472|
| 00R|Livingston Municipal|      Livingston|   TX|    USA|30.68586111|-95.01792778|
| 00V|         Meadow Lake|Colorado Springs|   CO|    USA|38.94574889|-104.5698933|
| 01G|        Perry-Warsaw|           Perry|   NY|    USA|42.74134667|-78.05208056|
| 01J|    Hilliard Airpark|        Hilliard|   FL|    USA| 30.6880125|-81.90594389|
+----+--------------------+----------------+-----+-------+-----------+------------+
only showing top 5 rows



### (optional) Transform data to Pandas Dataframe
#### Once converted **ALL DATA will be brought locally** and distributed processing ends <br>
* **Applicable to : small to medium datasets** - ie : datasets that can be easily managed/processed locally <br>
* When working with **large datasets** :  **data should be sampled** before bringing it locally

> **Good Practice**:  Spark context should be stopped `spark.stop()` to release cluster ressources once data is copied

In [18]:
#without sampling
import pandas 
airport_pandas_df = airports_df.toPandas()
airport_pandas_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 3376 entries, 0 to 3375
Data columns (total 7 columns):
iata       3376 non-null object
airport    3376 non-null object
city       3376 non-null object
state      3376 non-null object
country    3376 non-null object
lat        3376 non-null float64
long       3376 non-null float64
dtypes: float64(2), object(5)
memory usage: 184.7+ KB


In [19]:
#with sampling
sample_pandas_df = airports_df.sample(1/3,seed=30).toPandas()
sample_pandas_df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1111 entries, 0 to 1110
Data columns (total 7 columns):
iata       1111 non-null object
airport    1111 non-null object
city       1111 non-null object
state      1111 non-null object
country    1111 non-null object
lat        1111 non-null float64
long       1111 non-null float64
dtypes: float64(2), object(5)
memory usage: 60.8+ KB


### 2. Write data to HDFS/HIVE - Using Spark

### Read the data using Pandas

In [20]:
## read from pandas
import pandas as pd
airlines_pd_df = pd.read_csv("/home/cdsw/airlines/airports/airports.csv",sep=',', delimiter=None, header='infer')
airlines_pd_df.sort_values(by=['state','airport'],inplace=True) # ordering to keep same visulisation order 
airlines_pd_df.head()

Unnamed: 0,iata,airport,city,state,country,lat,long
776,ADK,Adak,Adak,AK,USA,51.877964,-176.646031
818,AKK,Akhiok,Akhiok,AK,USA,56.938691,-154.182556
3363,Z13,Akiachak,Akiachak,AK,USA,60.904532,-161.42091
817,AKI,Akiak,Akiak,AK,USA,60.904812,-161.227019
1994,KQA,Akutan SPB,Akutan,AK,USA,54.132467,-165.785311


### Transform Pandas DataFrame to Spark DataFrame
With spark 2.3 and up, integration with Pandas has been reinforced notably with the use of Arrow for faster data transfers [https://issues.apache.org/jira/browse/SPARK-20791]

In [23]:
# (optional) Enable Arrow-based optimised columnar data transfers ; Note : still marked as experimental
#spark.conf.set("spark.sql.execution.arrow.enabled", "true")

#(optional) good practice to define schema to prevent any type casting errors
schema = StructType([StructField("iata", StringType(), True),
                     StructField("airport", StringType(), True),
                     StructField("city", StringType(), True),
                     StructField("state", StringType(), True),
                     StructField("country", StringType(), True),
                     StructField("lat",  DoubleType(), True),
                     StructField("long",  DoubleType(), True)
                    ])

spark_df=spark.createDataFrame(airlines_pd_df,schema=schema)
spark_df.orderBy(['state','airport']).show(5) # ordering to keep same visulisation order 

+----+----------+--------+-----+-------+-----------+-------------------+
|iata|   airport|    city|state|country|        lat|               long|
+----+----------+--------+-----+-------+-----------+-------------------+
| ADK|      Adak|    Adak|   AK|    USA|51.87796389|       -176.6460306|
| AKK|    Akhiok|  Akhiok|   AK|    USA|56.93869083|       -154.1825556|
| Z13|  Akiachak|Akiachak|   AK|    USA|60.90453167|-161.42091000000002|
| AKI|     Akiak|   Akiak|   AK|    USA|60.90481194|       -161.2270189|
| KQA|Akutan SPB|  Akutan|   AK|    USA|54.13246694|       -165.7853111|
+----+----------+--------+-----+-------+-----------+-------------------+
only showing top 5 rows



### Write to HDFS - using Spark

In [24]:
spark_df.orderBy(['state','airport']).coalesce(2)\
    .write.parquet('/tmp/airlines/', mode='overwrite')

In [25]:
!hdfs dfs -ls /tmp/airlines/

Found 3 items
-rw-r--r--   2 systest supergroup          0 2019-12-04 09:16 /tmp/airlines/_SUCCESS
-rw-r--r--   2 systest supergroup      72995 2019-12-04 09:16 /tmp/airlines/part-00000-ecc21703-e81a-423a-b66f-64f279efc48b-c000.snappy.parquet
-rw-r--r--   2 systest supergroup      72510 2019-12-04 09:16 /tmp/airlines/part-00001-ecc21703-e81a-423a-b66f-64f279efc48b-c000.snappy.parquet


### Write Data to Hive - using Spark
Spark to hive integration makes it very easy to interact with the cluster. 

In [28]:
# Note : Ordering on write can help optimise reads later on. 
spark_df.orderBy(['state','airport']).coalesce(2)\
    .write.format('parquet').mode("overwrite")\
    .saveAsTable('default.airports')

### 3. Read Data from Hive 
All hive configurations are already injected into spark.  Therefore Hive can be called directly using a spark sql context.

In [29]:
sql_statement = '''show tables in default'''
spark.sql(sql_statement).show()

+--------+------------------+-----------+
|database|         tableName|isTemporary|
+--------+------------------+-----------+
| default|          airports|      false|
| default|         customers|      false|
| default|         sample_07|      false|
| default|         sample_08|      false|
| default|          web_logs|      false|
| default|        wineds_ext|      false|
| default|wineds_ext_nolabel|      false|
+--------+------------------+-----------+



In [36]:
#read table
sql_statement = '''select * from default.airports where state = "AK" '''
airports_df = spark.sql(sql_statement)
airports_df.show(10)

+----+----------+--------------+-----+-------+-----------+-------------------+
|iata|   airport|          city|state|country|        lat|               long|
+----+----------+--------------+-----+-------+-----------+-------------------+
| ADK|      Adak|          Adak|   AK|    USA|51.87796389|       -176.6460306|
| AKK|    Akhiok|        Akhiok|   AK|    USA|56.93869083|       -154.1825556|
| Z13|  Akiachak|      Akiachak|   AK|    USA|60.90453167|-161.42091000000002|
| AKI|     Akiak|         Akiak|   AK|    USA|60.90481194|       -161.2270189|
| KQA|Akutan SPB|        Akutan|   AK|    USA|54.13246694|       -165.7853111|
| AUK|  Alakanuk|      Alakanuk|   AK|    USA|62.68004417|       -164.6599253|
| 5A8| Aleknagik|     Aleknagik|   AK|    USA|59.28256167|       -158.6176725|
| 6A8| Allakaket|     Allakaket|   AK|    USA|66.55194444|-152.62222219999998|
| BIG| Allen AAF|Delta Junction|   AK|    USA|63.99454722|       -145.7216417|
| AFM|    Ambler|        Ambler|   AK|    USA|67.106

In [37]:
#(OPTIONAL) convert to pandas 
airlines_pd_df = airports_df.toPandas()
airlines_pd_df.head()

Unnamed: 0,iata,airport,city,state,country,lat,long
0,ADK,Adak,Adak,AK,USA,51.877964,-176.646031
1,AKK,Akhiok,Akhiok,AK,USA,56.938691,-154.182556
2,Z13,Akiachak,Akiachak,AK,USA,60.904532,-161.42091
3,AKI,Akiak,Akiak,AK,USA,60.904812,-161.227019
4,KQA,Akutan SPB,Akutan,AK,USA,54.132467,-165.785311


In [38]:
spark.stop() ## Release spark ressources

#### ***NOTE:*** Pandas Dataframe is still available

In [39]:
airport_pandas_df.head()

Unnamed: 0,iata,airport,city,state,country,lat,long
0,00M,Thigpen,Bay Springs,MS,USA,31.953765,-89.234505
1,00R,Livingston Municipal,Livingston,TX,USA,30.685861,-95.017928
2,00V,Meadow Lake,Colorado Springs,CO,USA,38.945749,-104.569893
3,01G,Perry-Warsaw,Perry,NY,USA,42.741347,-78.052081
4,01J,Hilliard Airpark,Hilliard,FL,USA,30.688012,-81.905944


## OPTION 3 - Read Directly from Pandas ( HIVE ONLY - JDBC ) 


### 1. Read using Pandas sql interface (Compatible with pyhive or SQLAlchmy)
**NOTE:** Must have know Hive host and port (default 10000) information

In [40]:
#with pyhive
from pyhive import hive
import pandas as pd
conn=hive.Connection(host='mlamairesse-training-1.vpc.cloudera.com', port=10000, auth='KERBEROS', 
                     kerberos_service_name='hive')
airlines_pd_df = pd.read_sql('select * from default.airports',conn)
airlines_pd_df.head()

Unnamed: 0,airports.iata,airports.airport,airports.city,airports.state,airports.country,airports.lat,airports.long
0,ADK,Adak,Adak,AK,USA,51.877964,-176.646031
1,AKK,Akhiok,Akhiok,AK,USA,56.938691,-154.182556
2,Z13,Akiachak,Akiachak,AK,USA,60.904532,-161.42091
3,AKI,Akiak,Akiak,AK,USA,60.904812,-161.227019
4,KQA,Akutan SPB,Akutan,AK,USA,54.132467,-165.785311


In [41]:
#with SQLAlchemy
from sqlalchemy import create_engine
import pandas as pd
conn = create_engine("hive://systest@mlamairesse-training-1.vpc.cloudera.com:10000/default",
                     connect_args={'auth': 'KERBEROS','kerberos_service_name': 'hive'})
#engine = create_engine("hive://<kerberos-username>@<hive-host>:<hive-port>/<db-name>",connect_args={'auth': 'KERBEROS','kerberos_service_name': 'hive'})
airlines_pd_df = pd.read_sql('select * from default.airports',conn)
airlines_pd_df.head()

Unnamed: 0,iata,airport,city,state,country,lat,long
0,ADK,Adak,Adak,AK,USA,51.877964,-176.646031
1,AKK,Akhiok,Akhiok,AK,USA,56.938691,-154.182556
2,Z13,Akiachak,Akiachak,AK,USA,60.904532,-161.42091
3,AKI,Akiak,Akiak,AK,USA,60.904812,-161.227019
4,KQA,Akutan SPB,Akutan,AK,USA,54.132467,-165.785311
