# Purpose

```
   Create a databricks notebook (as base python)
   
   Investigate the Databricks DBFS 
   
   Create and read Parquet files, and understand them better 
   
   Create and read JSON files, display etc 
   
   ```

<br>

#### Spark SQL provides support for both reading and writing Parquet files that automatically preserves the schema of the original data !

In [5]:
# Parquet is a columnar format that is supported by many other data processing systems. 

# Spark SQL provides support for both reading and writing Parquet files that automatically 
# preserves the schema of the original data. 

# When writing Parquet files, all columns are automatically converted to be nullable for compatibility reasons.

# Examine the spark context

In [8]:
# return all of the methods possible under this particular input object
def list_out_object_methods(your_object):
  for method in dir(your_object):
    if not method.startswith("_"):
      print(method)
      
# i.e. i create a spark object, but i want to see all the methods possible on 
# it, use this function.  It helps to list out ALL methods you can enter ! 

In [9]:
spark

In [10]:
# lets confirm we have a spark context created
# if you know me, u know i always do these first couple steps
sc

In [11]:
list_out_object_methods(spark)

In [12]:
# list all the methods for the spark context, in case you want to look around
list_out_object_methods(sc)   # then you just enter sc.<the method>

In [13]:
def gather_up_my_spark_session_infor_for_me(s):
  print("Apache Spark Version . . . . . . . . . . . ",s.version)
  print("Spark master . . . . . . . . . . . . . . . ",s.master)
  print("Core Python Version . . .  . . . . . . . . ",s.pythonVer)
  print("App Name . . . . . . . . . . . . . . . . . ",s.appName)
  print("spark URL  . . . . . . . . . . . . . . . . ",s.uiWebUrl)
  print("Spark Home . . . . . . . . . . . . . . . . ",s.sparkHome)
  print("Spark session started at   . . . . . . . . ",s.startTime)
  print("Spark defaultMinPartitions . . . . . . . . ",s.defaultMinPartitions)
  print("Spark defaultParallelism . . . . . . . . . ",s.defaultParallelism)
  print("Spark hadoopFile . . . . . . . . . . . . . ",s.hadoopFile)
  print("Spark user . . . . . . . . . . . . . . . . ",s.sparkUser)  

In [14]:
gather_up_my_spark_session_infor_for_me(sc)

<br>

# Let's look at Parquet Files:

THE standard i think:
*  https://github.com/apache/parquet-format

In [18]:
# side note:  
# upload image to databricks DBFS
# it will show up under DBFS /FileStore/tables/<yourfilename>
# and then from there, IF you want to access the file for HTML image view:
#  /files/svm.jpg
# but for writing, it would be:
#  /FileStore/svm.jpg

Parquet is a columnar file format that provides optimizations to speed up queries and is a far more efficient file format than CSV or JSON.

Parquet stores binary data in a column-oriented way, where the values of each column are organized so that they are all adjacent, enabling better compression. It is especially good for queries which read particular columns from a “wide” (with many columns) table since only needed columns are read and IO is minimized.

<img src ='/files/tables/parquet.gif'>

In [21]:
dbutils.fs.ls("/tmp")

In [22]:
dbutils.fs.ls("/databricks-datasets")

In [23]:
# path to file (flight data)
path = "/tmp/flights_parquet/Origin=DFW/part-00004-tid-6689744057697969040-c960f890-badf-4220-9b06-f4058ede2b08-105-79.c000.snappy.parquet"

# read and display the parquet file
parquetDF = spark.read.parquet(path)
display(parquetDF.head(7))


Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,TailNum,ActualElapsedTime,CRSElapsedTime,AirTime,ArrDelay,DepDelay,Dest,Distance,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
2008,8,11,1,,645,,1023,XE,2004,,,158,,,,CLE,1021,,,1,C,0,,,,,
2008,6,17,2,653.0,700,1120.0,1136,CO,1130,N16650,207.0,216,177.0,-16.0,-7.0,EWR,1372,8.0,22.0,0,,0,,,,,
2008,8,14,4,2307.0,1905,231.0,2243,XE,2572,N13995,144.0,158,128.0,228.0,242.0,CLE,1021,5.0,11.0,0,,0,0.0,0.0,0.0,0.0,228.0
2008,6,11,3,1917.0,1918,2016.0,2028,CO,714,N76354,59.0,70,42.0,-12.0,-1.0,IAH,224,5.0,12.0,0,,0,,,,,
2008,8,20,3,1534.0,1307,1708.0,1420,XE,2214,N14943,94.0,73,53.0,168.0,147.0,IAH,224,17.0,24.0,0,,0,0.0,0.0,21.0,0.0,147.0
2008,6,12,4,1805.0,1805,2240.0,2245,CO,1140,N17620,215.0,220,193.0,-5.0,0.0,EWR,1372,8.0,14.0,0,,0,,,,,
2008,8,30,6,1625.0,1630,1737.0,1742,XE,2805,N16951,72.0,72,46.0,-5.0,-5.0,IAH,224,16.0,10.0,0,,0,,,,,


In [24]:
# side note
#   to completely remove a file:
#     -  dbutils.fs.rm("/tmp/databricks-df-example.parquet", True)

* Good link that talks about why parquet is awesome: https://databricks.com/glossary/what-is-parquet

In [26]:
# You have spark context running, but you also have sqlContext as well...
sqlContext  # confirm it is running...

In [27]:
# these are all the things i can 'do' to my DF:
list_out_object_methods(parquetDF)

In [28]:
# i want to know how many rows of data are in my parquet file:
parquetDF.count()

In [29]:
print(type(parquetDF))  # this is a DataFrame fyi 

In [30]:
# listing out the columns in the actual dataframe
parquetDF.columns

# very handy to use...

In [31]:
# i wish to output the column of 'Dest' that are unique entries:
parquetDF.select('Dest').distinct().show()  # like using show, it automatically limits output...

In [32]:
parquetDF.schema

In [33]:
list_out_object_methods(parquetDF)

```

All the methods on the DF that are possible:

agg
alias
approxQuantile
cache
checkpoint
coalesce
colRegex
collect
columns
corr
count
cov
createGlobalTempView
createOrReplaceGlobalTempView
createOrReplaceTempView
createTempView
crossJoin
crosstab
cube
describe
distinct
drop
dropDuplicates
drop_duplicates
dropna
dtypes
exceptAll
explain
fillna
filter
first
foreach
foreachPartition
freqItems
groupBy
groupby
head
hint
intersect
intersectAll
isLocal
isStreaming
is_cached
join
limit
localCheckpoint
na
orderBy
persist
printSchema
randomSplit
rdd
registerTempTable
repartition
repartitionByRange
replace
rollup
sample
sampleBy
schema
select
selectExpr
show
sort
sortWithinPartitions
sql_ctx
stat
storageLevel
subtract
summary
take
toDF
toJSON
toLocalIterator
toPandas
transform
union
unionAll
unionByName
unpersist
where
withColumn
withColumnRenamed
withWatermark
write
writeStream
```

##### Use SQL direct commands, which i really like:

*Create a temporary table and then we can query it*

<br>
<br>

# Parquet Example #2

In [39]:
# lets look at a large file
# File uploaded to /FileStore/tables/userdata1.parquet

A good source of some parquet files:
* https://github.com/Teradata/kylo/tree/master/samples/sample-data/parquet

In [41]:
dbutils.fs.ls("dbfs:/FileStore/tables")    # you can see your file there

In [42]:
dataP = spark.read.parquet("dbfs:/FileStore/tables/userdata1.parquet")

# see how it infers the schema ? 



# -> Number of rows in the file: 1000
# -> Column details:
# column#		column_name		hive_datatype
# =====================================================
# 1		registration_dttm 	    timestamp
# 2		id 			            int
# 3		first_name 		        string
# 4		last_name 		        string
# 5		email 			        string
# 6		gender 			        string
# 7		ip_address 		        string
# 8		cc 			            string
# 9		country 		        string
# 10	birthdate 		        string
# 11	salary 			        double
# 12	title 			        string
# 13	comments 		        string



In [43]:
dataP.show(10)

In [44]:
# I can poll the schema directly
dataP.schema

In [45]:
# feeling lazy ? 

# upload parquet file, read it into dataframe in python, and then use 
# databrick's export 'Download csv' buttom (far right downward arrow) to
# download and view the csv form on your laptop...

In [46]:
dataP.count()  # 1,000 rows

<br>

# Let's examine JSON

In [49]:
# i manually uploaded this file called json.json
# sidenote: If your cluster is running Databricks Runtime 4.0 and above, you can read JSON 
# files in single-line or multi-line mode. In single-line mode, a file can be split into 
# many parts and read in parallel.

# lets confirm i can see it in my storage: 
display(dbutils.fs.ls("dbfs:/FileStore/tables/json.json"))

path,name,size
dbfs:/FileStore/tables/json.json,json.json,243


In [50]:
randomDF = spark.read.json("dbfs:/FileStore/tables/json.json")

In [51]:
# this is my json.json file i uploaded:
#  {"string":"string1","int":1,"array":[1,2,3],"dict": {"key": "value1"}}
#  {"string":"string2","int":2,"array":[2,4,6],"dict": {"key": "value2"}}
#  {"string":"string3","int":3,"array":[3,6,9],"dict": {"key": "value3", "extra_key":extra_value3"}}


# but the cool part is that Spark infers the schema automatically
randomDF.printSchema

In [52]:
randomDF.show()

In [53]:
display(randomDF)

array,dict,int,string
"List(1, 2, 3)","List(null, value1)",1,string1
"List(2, 4, 6)","List(null, value2)",2,string2
"List(3, 6, 9)","List(extra_value3, value3)",3,string3


<br>

## Another JSON example:

In [56]:
import json
import requests

# You’ll need to make an API request to the JSONPlaceholder service, so just use the requests 
# package to do the heavy lifting.
response = requests.get("https://jsonplaceholder.typicode.com/todos")
todos = json.loads(response.text)


In [57]:
response.json()

In [58]:
todos == response.json()

In [59]:
type(todos)

In [60]:
todos[:4]

In [61]:
display(todos)

completed,id,title,userId
False,1,delectus aut autem,1
False,2,quis ut nam facilis et officia qui,1
False,3,fugiat veniam minus,1
True,4,et porro tempora,1
False,5,laboriosam mollitia et enim quasi adipisci quia provident illum,1
False,6,qui ullam ratione quibusdam voluptatem quia omnis,1
False,7,illo expedita consequatur quia in,1
True,8,quo adipisci enim quam ut ab,1
False,9,molestiae perspiciatis ipsa,1
True,10,illo est ratione doloremque quia maiores aut,1


<br>

## another JSON example

In [64]:
dbutils.fs.put("/tmp/test.json", """
{"string":"string1","int":1,"array":[1,2,3],"dict": {"key": "value1"}}
{"string":"string2","int":2,"array":[2,4,6],"dict": {"key": "value2"}}
{"string":"string3","int":3,"array":[3,6,9],"dict": {"key": "value3", "extra_key": "extra_value3"}}
""", True)

In [65]:
testJsonData = sqlContext.read.json("/tmp/test.json")

display(testJsonData)

array,dict,int,string
"List(1, 2, 3)","List(null, value1)",1,string1
"List(2, 4, 6)","List(null, value2)",2,string2
"List(3, 6, 9)","List(extra_value3, value3)",3,string3


In [66]:
%sql 
    CREATE TEMPORARY TABLE jsonTable
    USING json
    OPTIONS (
      path "/tmp/test.json"
    )

In [67]:
%sql SELECT * FROM jsonTable


array,dict,int,string
"List(1, 2, 3)","List(null, value1)",1,string1
"List(2, 4, 6)","List(null, value2)",2,string2
"List(3, 6, 9)","List(extra_value3, value3)",3,string3


<br>

# Databricks Delta Examination

>  *lets look at a deeper examination of parquet and delta, and if we can make some improvements*

In [71]:
# note in the real world, using non Databricks, you can just print out as well

flights = spark.read.format("csv") \
  .option("header", "true") \
  .option("inferSchema", "true") \
  .load("/databricks-datasets/asa/airlines/2008.csv")

# see how the schema is inferred ???

In [72]:
# write a parquet-based table using this flights data 
# i specifically partition by ORIGIN
# once i do this, it will spread files around, and breakout by folders of Origin

# flights.write.format("parquet").mode("overwrite").partitionBy("Origin").save("/tmp/flights_parquet_2")

In [73]:
# your output files
#   display(dbutils.fs.ls("dbfs:/tmp/flights_parquet"))  # you can see your file there

In [74]:
# 8 core files per origin, example shown:
#  display(dbutils.fs.ls("dbfs:/tmp/flights_parquet/Origin=DFW/"))  # you can see your file there

In [75]:
# seeing display command in action, side note:
# from pyspark.sql.functions import avg
# diamonds_df = spark.read.csv("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", header="true", inferSchema="true")
# display(diamonds_df.select("color","price").groupBy("color").agg(avg("price")))

In [76]:
# lets look at one single subFile

curiousDF = sqlContext.read.parquet("dbfs:/tmp/flights_parquet/Origin=DFW/part-00004-tid-6689744057697969040-c960f890-badf-4220-9b06-f4058ede2b08-105-79.c000.snappy.parquet")

print("This individual dataset has the following number of rows: ", curiousDF.count())

Once step 1 completes, the "flights" table contains details of US flights for a year.

Next in Step 2, we run a query that get top 20 cities with highest monthly total flights on first day of week.

In [78]:
# so when you open up the tmp folder, you see a few hundred files distributed by 
# origin id of like DFW airport.  

# eight files per original

# ex: /tmp/flights_parquet/Origin=DFW/part-00000-tid-1227084317432497623-9bdbeb42-5722-4f94-9755-6801a612974e-421-79.c000.snappy.parquet

#### step 2

In [80]:
# lets run a query:

from pyspark.sql.functions import count

# this has got to be a huge file, its consolidating everything 
flights_parquet = spark.read.format("parquet").load("/tmp/flights_parquet")

# query the huge file, breakout by day of week 1, groupby, and count totals
display(flights_parquet.filter("DayOfWeek=1").groupBy("Month","Origin").agg(count("*").alias("TotalFlights")).orderBy("TotalFlights", ascending=False).limit(20))

# this took 1.72 minutes ! 
# ran it again, and then it 2.46 minutes


Month,Origin,TotalFlights
6,ATL,6046
3,ATL,6019
12,ATL,5800
9,ATL,5722
6,ORD,5241
3,ORD,5072
9,ORD,4931
7,ATL,4894
8,ATL,4821
4,ATL,4798


In [81]:
print(type(flights_parquet))

In [82]:
flights_parquet.count()
# thats over 7 million rows dude, pretty hard core for a quick check ...

In [83]:
# very very good command for seeing the schema you have in place for the DF
#############################a
flights_parquet.printSchema()
#############################

In [84]:
flights_parquet.columns

#### step 3

Once step 2 completes, you can observe the latency with the standard "flights_parquet" table.

In step 3 and step 4, we do the same with a Databricks Delta table. This time, before running the query, we run the OPTIMIZE command with ZORDER to ensure data is optimized for faster retrieval.

In [87]:

# Step 3: Write a Databricks Delta based table using flights data
flights.write.format("delta").mode("overwrite").partitionBy("Origin").save("/tmp/flights_delta")


In [88]:

# Step 3 Continued: OPTIMIZE the Databricks Delta table

display(spark.sql("DROP TABLE  IF EXISTS flights"))

display(spark.sql("CREATE TABLE flights USING DELTA LOCATION '/tmp/flights_delta'"))
                  
display(spark.sql("OPTIMIZE flights ZORDER BY (DayofWeek)"))


In [89]:
# Step 4 : Rerun the query from Step 2 and observe the latency

flights_delta = spark.read.format("delta").load("/tmp/flights_delta")

display(flights_delta.filter("DayOfWeek=1").groupBy("Month","Origin").agg(count("*").alias("TotalFlights")).orderBy("TotalFlights", ascending=False).limit(20))

# its faster now 


In [90]:

#  HMMMMMMMM

#  it went from taking:  1.72 minutes (and then 2.46 mins)
#  to taking about    :  42.35 seconds
#  about 2.43X + faster...


<br>

## Appendix:  Access DBFS and looking around at functionality

In [93]:
display(dbutils.fs)

In [94]:

#  write a temp file to DBFS with python i/o apis
#  then print it out 
#  all python based (base notebook is python when created)

#write a file to DBFS using python i/o apis
with open("/dbfs/tmp/test_dbfs.txt", 'w') as f:
  f.write("Apache Spark is the bomb\n")
  f.write("Tom Bresee was the bomb\n")
  f.write("Now Databricks is the bomb\n")
  f.write("Goodbye.")

# read the file
with open("/dbfs/tmp/test_dbfs.txt", "r") as f_read:
  for line in f_read:
    print (line)
    
# df.write.text("/tmp/foo.txt")


In [95]:
# When you’re using Spark APIs, you reference files with "/mnt/training/file.csv" or 
# "dbfs:/mnt/training/file.csv". If you’re using local file APIs, you must provide 
# the path under /dbfs, for example: "/dbfs/mnt/training/file.csv". You cannot use 
# a path under dbfs when using Spark APIs.

In [96]:
%scala
// Now read the file you just created with the Scala programming language...

import scala.io.Source

val filename = "/dbfs/tmp/test_dbfs.txt"  // what you just created 

for (line <- Source.fromFile(filename).getLines()) {
  println(line)
}

In [97]:
# create a directory called foobar
dbutils.fs.mkdirs("/foobar/")

# now if you go to your main menu, and then click Import and Explore Data,
# you will see this new folder you just created ! 

In [98]:
# put the verbage into the file 
dbutils.fs.put("/foobar/baz.txt", "Hello, World!")

In [99]:
# thats awesome, i blew out the free edition in the first two days... 

In [100]:
# list out all the folders you have in DBFS ! 
# this is really a 'dir' level command ! 

display(dbutils.fs.ls("dbfs:/"))

path,name,size
dbfs:/FileStore/,FileStore/,0
dbfs:/cbp.vds/,cbp.vds/,0
dbfs:/databricks/,databricks/,0
dbfs:/databricks-datasets/,databricks-datasets/,0
dbfs:/databricks-results/,databricks-results/,0
dbfs:/delta/,delta/,0
dbfs:/foobar/,foobar/,0
dbfs:/kdd/,kdd/,0
dbfs:/local_disk0/,local_disk0/,0
dbfs:/ml/,ml/,0


In [101]:
# lets look around into the Parquet folder you created earlier ! ! ! 

display(dbutils.fs.ls("dbfs:/tmp/testParquet"))

path,name,size
dbfs:/tmp/testParquet/_committed_354797835800861187,_committed_354797835800861187,1205
dbfs:/tmp/testParquet/_committed_3720738684160984522,_committed_3720738684160984522,1217
dbfs:/tmp/testParquet/_committed_4775365216835712372,_committed_4775365216835712372,1216
dbfs:/tmp/testParquet/_committed_5658643342479558245,_committed_5658643342479558245,1223
dbfs:/tmp/testParquet/_committed_6765437422472151022,_committed_6765437422472151022,1211
dbfs:/tmp/testParquet/_committed_81539086910767789,_committed_81539086910767789,1211
dbfs:/tmp/testParquet/_started_1885843275169497,_started_1885843275169497,0
dbfs:/tmp/testParquet/_started_354797835800861187,_started_354797835800861187,0
dbfs:/tmp/testParquet/_started_3720738684160984522,_started_3720738684160984522,0
dbfs:/tmp/testParquet/_started_5539009013964348726,_started_5539009013964348726,0


In [102]:
# wanna know something odd ?   
# you CANT put comments into the same cell as a filesystem magic command ! 
# what the what, it will erorr out, so it has to be a stand alone cell ! 

In [103]:
# try:
#     import jira.client
#     JIRA_IMPORTED = True

# except ImportError:
#     JIRA_IMPORTED = False