## This is a sample notebook showing how to read and write directly to HDFS

#### Initialize Spark

In [1]:
from pyspark.sql import SparkSession
import os

In [2]:
spark = SparkSession.builder.appName('sample-'+os.getlogin()).enableHiveSupport().getOrCreate()

#### Read direct paquet files from EAP

In [3]:
ds = spark.read.parquet('/data/gfctocon/work/hive/prod_tlv_documents/batch_id=2500')

In [4]:
ds.select('thread_id','date','subject').show()

+--------------------+----------+--------------------+
|           thread_id|      date|             subject|
+--------------------+----------+--------------------+
|conversation_exch...|2019_03_05|EQRA/RiskAppsMain...|
|conversation_exch...|2019_03_05|Philippines Econo...|
|conversation_exch...|2019_03_05|RE: 2019 - 044 BE...|
|conversation_exch...|2019_03_05|[PROD] CENTR Even...|
|conversation_exch...|2019_03_05|       JFM New Issue|
|conversation_exch...|2019_03_05|Full Checks Colo ...|
|conversation_exch...|2019_03_05|SIT Pending Appro...|
|conversation_exch...|2019_03_05|RE: Receipt from ...|
|conversation_exch...|2019_03_05|(BIIB): BIIB to a...|
|conversation_exch...|2019_03_05|Citi Sales and Tr...|
|conversation_exch...|2019_03_05|SEV2  (INITIAL): ...|
|conversation_exch...|2019_03_05|CTE - Expense Rep...|
|conversation_exch...|2019_03_05| CAP Usage: $12.998B|
|conversation_exch...|2019_03_05| RE: AMC Lender Call|
|conversation_exch...|2019_03_05|Credit MAT RPT -2...|
|conversat

#### Show the schema of the data frame

In [5]:
ds.printSchema()

root
 |-- document_id: string (nullable = true)
 |-- date: string (nullable = true)
 |-- thread_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- subject: string (nullable = true)
 |-- content_type: string (nullable = true)
 |-- message_type: string (nullable = true)
 |-- content: string (nullable = true)



#### Example performing Spark SQL against a dataframe

In [10]:
ds.createOrReplaceTempView('dsView')
resultDataFrame = spark.sql('select message_type,count(1) cnt from dsView group by message_type')
resultDataFrame.show()

+------------+---+
|message_type|cnt|
+------------+---+
|    exchange|522|
+------------+---+



In [11]:
resultDataFrame.collect()

[Row(message_type='exchange', cnt=522)]

#### Write dataframe to hdfs

In [12]:
import uuid
uniqueDirectoryName = '/data/gfctocon/work/sampleoutput-{}'.format(uuid.uuid4().hex)
resultDataFrame.write.parquet(uniqueDirectoryName)

#### Read from a local file. 
#### This will only work if spark is running in LOCAL mode
#### If you are using a spark-defaults.conf with cluster mode then executors run on EAP and those executors cannot see this local file 

In [14]:
# ds = spark.read.csv('file:///test.csv')

In [None]:
ds.show()

#### Can run a cmd shell command including our hadoop hdfs command from within jupyter. 
#### This will only work for dev because the hdfs command for uat and prod  prompts for kerberos credentials

In [15]:
!hdfs dev dfs -ls /data/gfctocon/work

New ticket is stored in cache file C:\Users\ae79644\krb5cc_ae79644
Found 87 items
-rw-r--r--   3 gf90162  gfctocon      26743 2019-03-04 13:04 /data/gfctocon/work/CashEquity_Mantas_Alerts_20181107.csv
drwxr-xr-x   - aa08467  gfctocon          0 2018-11-02 11:41 /data/gfctocon/work/aa08467
drwxr-xr-x   - gfctocon gfctocon          0 2018-12-27 12:47 /data/gfctocon/work/adam
drwxrwxrwx   - gfctocon gfctocon          0 2018-11-02 14:26 /data/gfctocon/work/adamtest
drwxrwxr-x   - ch10471  gfctocon          0 2016-10-12 14:18 /data/gfctocon/work/ahi_
drwxr-xr-x   - gfctocon gfctocon          0 2018-04-01 06:40 /data/gfctocon/work/alpha_
-rw-r--r--   3 gfctocon gfctocon        600 2018-03-22 08:54 /data/gfctocon/work/alpha_hive_migrations
drwxr-xr-x   - gfctocon gfctocon          0 2019-01-16 09:52 /data/gfctocon/work/archive
drwxrwxrwx   - gfctocon gfctocon          0 2019-09-04 10:06 /data/gfctocon/work/benchmark
drwxrwxrwx   - gfctocon gfctocon          0 2019-09-16 11:03 /data/gfctocon/w

Picked up _JAVA_OPTIONS: -Djava.security.krb5.conf=C:\Users\ae79644\adamrepo\hadoop\bin\\..\conf\dev\krb5.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/ae79644/adamrepo/hadoop/jars/pig-0.12.0-cdh5.9.1.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/ae79644/adamrepo/hadoop/jars/slf4j-log4j12-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/ae79644/adamrepo/hadoop/jars/slf4j-simple-1.7.5.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Picked up _JAVA_OPTIONS: -Djava.security.krb5.conf=C:\Users\ae79644\adamrepo\hadoop\bin\\..\conf\dev\krb5.conf
