# Dataframes

In [0]:
%fs ls "/databricks-datasets/samples/"

path,name,size
dbfs:/databricks-datasets/samples/adam/,adam/,0
dbfs:/databricks-datasets/samples/data/,data/,0
dbfs:/databricks-datasets/samples/docs/,docs/,0
dbfs:/databricks-datasets/samples/lending_club/,lending_club/,0
dbfs:/databricks-datasets/samples/newsgroups/,newsgroups/,0
dbfs:/databricks-datasets/samples/people/,people/,0
dbfs:/databricks-datasets/samples/population-vs-price/,population-vs-price/,0


In [0]:
df = spark.read \
.format("csv") \
.option("header", "true") \
.option("inferSchema", "true") \
.load("/databricks-datasets/samples/population-vs-price/data_geo.csv")

In [0]:
display(df.head(5))
df.dtypes

2014 rank,City,State,State Code,2014 Population estimate,2015 median sales price
101,Birmingham,Alabama,AL,212247,162.9
125,Huntsville,Alabama,AL,188226,157.7
122,Mobile,Alabama,AL,194675,122.5
114,Montgomery,Alabama,AL,200481,129.0
64,Anchorage[19],Alaska,AK,301010,


In [0]:
df.columns

In [0]:
df = df.withColumnRenamed("2014 rank", "Rank") \
.withColumnRenamed("2014 Population estimate","Population")\
.withColumnRenamed("2015 median sales price","Price")\
.withColumnRenamed("State Code","State_code")
display(df)

Rank,City,State,State_code,Population,Price
101,Birmingham,Alabama,AL,212247.0,162.9
125,Huntsville,Alabama,AL,188226.0,157.7
122,Mobile,Alabama,AL,194675.0,122.5
114,Montgomery,Alabama,AL,200481.0,129.0
64,Anchorage[19],Alaska,AK,301010.0,
78,Chandler,Arizona,AZ,254276.0,
86,Gilbert[20],Arizona,AZ,239277.0,
88,Glendale,Arizona,AZ,237517.0,
38,Mesa,Arizona,AZ,464704.0,
148,Peoria,Arizona,AZ,166934.0,


In [0]:
pdf = df.toPandas()
print(type(pdf))
pdf

Unnamed: 0,Rank,City,State,State_code,Population,Price
0,101,Birmingham,Alabama,AL,212247.0,162.9
1,125,Huntsville,Alabama,AL,188226.0,157.7
2,122,Mobile,Alabama,AL,194675.0,122.5
3,114,Montgomery,Alabama,AL,200481.0,129.0
4,64,Anchorage[19],Alaska,AK,301010.0,
...,...,...,...,...,...,...
289,107,Tacoma,Washington,WA,205159.0,
290,145,Vancouver,Washington,WA,169294.0,
291,279,Green Bay,Wisconsin,WI,104891.0,
292,83,Madison,Wisconsin,WI,245691.0,226.2


In [0]:
import numpy as np
pdf['Price'] = pdf['Price'].fillna(0)
pdf['Population'] = pdf['Population'].fillna(0)
prices = pdf['Price']
prices = np.array(prices)
price_mean = prices.mean()
print(price_mean)

Populations = pdf['Population']
Populations = np.array(Populations)
Pop_mean = Populations.mean()
print(Pop_mean)

In [0]:
# df = df.fillna(0,['Population', "Price"])
df = df.na.fill({
  "Population":Pop_mean,
  "Price": price_mean
})
df.show()

In [0]:
from pyspark.sql.functions import col , column
df = df.withColumn("bus_val", df['Price'] * df['Population'])
df.show()

In [0]:
mydf = df.select(["State_code", "Price"]).groupBy("State_code").mean().orderBy("State_code")
mydf.show()

In [0]:
df.where("City='Springfield'").show()

In [0]:
df.printSchema()

In [0]:
df = df.withColumn("bus_val", df.bus_val.cast("int"))
df.show()

In [0]:
# UDF
def getPercentage(value=None, percentage=None):
  if value is None or value == "" or value == 0:
    return 0
  
  if percentage is None or percentage == "" or percentage == 0:
    return value
  
  final_val = ((value*percentage) / 100)
  return int(final_val)


# Unit testing
print(getPercentage(value=100, percentage=10))
print(getPercentage(value="", percentage=""))
print(getPercentage(value=0, percentage=0))
print(getPercentage(value=10, percentage=0))
print(getPercentage(value=0, percentage=10))
print(getPercentage(value=None, percentage=None))
print(getPercentage(value=10, percentage=None))
print(getPercentage(value=None, percentage=10))

In [0]:
spark.udf.register("udf_getPercentage", getPercentage)

In [0]:
df.createOrReplaceTempView("df_view")

In [0]:
%sql
-- show views
select *, udf_getPercentage(bus_val, 60) as est_cost, udf_getPercentage(bus_val, 30) as opt_cost from df_view

Rank,City,State,State_code,Population,Price,bus_val,est_cost,opt_cost
101,Birmingham,Alabama,AL,212247,162.9,34575036,20745021,10372510
125,Huntsville,Alabama,AL,188226,157.7,29683240,17809944,8904972
122,Mobile,Alabama,AL,194675,122.5,23847687,14308612,7154306
114,Montgomery,Alabama,AL,200481,129.0,25862049,15517229,7758614
64,Anchorage[19],Alaska,AK,301010,78.32653061224488,23577068,14146240,7073120
78,Chandler,Arizona,AZ,254276,78.32653061224488,19916556,11949933,5974966
86,Gilbert[20],Arizona,AZ,239277,78.32653061224488,18741737,11245042,5622521
88,Glendale,Arizona,AZ,237517,78.32653061224488,18603882,11162329,5581164
38,Mesa,Arizona,AZ,464704,78.32653061224488,36398652,21839191,10919595
148,Peoria,Arizona,AZ,166934,78.32653061224488,13075361,7845216,3922608


In [0]:
mdf = spark.sql('select *, udf_getPercentage(bus_val, 60) as est_cost, udf_getPercentage(bus_val, 30) as opt_cost from df_view')
mdf.show()

In [0]:
mdf.write.saveAsTable("bus_est")

In [0]:
import os
os.listdir("/tmp/")

In [0]:
# mdf.write.repartition(1).format("csv").mode("overwrite").save("/temp/df/mdf.csv")
mdf.write.format("csv").save("/tmp/zipco1")

In [0]:
%fs rm "dbfs:/temp/df/mdf/part-00000-tid-111925865517233280-485573fd-6ac9-498d-ac71-a0befe2698ca-31-1-c000.csv"

In [0]:
%fs ls "dbfs:/temp/df/mdf"

In [0]:
dbutils.fs.rm("dbfs:/temp/")

# DBUTILS

In [0]:
dbutils.help()

In [0]:
dbutils.widgets.help()
# %fs

In [0]:
%fs cp "/data/student_scores.csv" "/data/student.csv"

In [0]:
%fs rm /data/student.csv

In [0]:
%fs 

ls "/data/"

path,name,size
dbfs:/data/student_scores.csv,student_scores.csv,214


In [0]:
%sh
ls -ll /mnt

## Mount

Help : [https://docs.databricks.com/data/data-sources/azure/azure-storage.html#mount-azure-blob-storage-containers-to-dbfs]

In [0]:
storage_account_name = "adalsmanojdemo"
container_name = "input"
key_name = "OWLpKpR7VLha0sJH/UwReLeLBjvE9L7Sb1T0sFUmOrUp9uzCl+/Jpu6YU+Zmf89v2JTSbOzEtykZ+GwIF4y+0w==" # Accesskey of the container
conf_key = f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net"
mount_name ="demo_input"

dbutils.fs.mount(
  source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
  mount_point = f"/mnt/{mount_name}",
  extra_configs = {"fs.azure.account.key.adalsmanojdemo.blob.core.windows.net":"OWLpKpR7VLha0sJH/UwReLeLBjvE9L7Sb1T0sFUmOrUp9uzCl+/Jpu6YU+Zmf89v2JTSbOzEtykZ+GwIF4y+0w=="})

In [0]:
%fs
ls /mnt/demo_input/abcd_test.csv

path,name,size
dbfs:/mnt/demo_input/abcd_test.csv/_SUCCESS,_SUCCESS,0
dbfs:/mnt/demo_input/abcd_test.csv/_committed_1950993658143010360,_committed_1950993658143010360,112
dbfs:/mnt/demo_input/abcd_test.csv/_started_1950993658143010360,_started_1950993658143010360,0
dbfs:/mnt/demo_input/abcd_test.csv/part-00000-tid-1950993658143010360-167836a5-15f5-41a9-9bb7-45d8729b6435-13-1-c000.csv,part-00000-tid-1950993658143010360-167836a5-15f5-41a9-9bb7-45d8729b6435-13-1-c000.csv,507


In [0]:
adf = spark.read.csv("dbfs:/mnt/demo_input/ABCD.csv", header=True)
adf.show()

In [0]:
adf.write.csv("/mnt/demo_input/abcd_test.csv")

In [0]:
from pyspark.sql import *

In [0]:
constr="jdbc:sqlserver://harish-123.database.windows.net:1433;database=harish_db;user=dbadmin@harish-123;password=Welcome@123;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
mfrf = DataFrameWriter(mdf)
mfrf.jdbc(url=constr, table="bus_est")

In [0]:
sqldf = spark.read.format('jdbc').option("url", constr).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option('dbtable', "bus_est").load()
sqldf.show()

In [0]:
sqldf2 = spark.read.format('jdbc').option("url", constr).option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver").option('query', "select * from bus_est where State_code='AL'").load()
sqldf2.show()

In [0]:
dbutils.secrets.get("scope", "name")