In [1]:
%%pyspark

df = spark.read.load('abfss://capture@splacceler5lmevhdeon4ym.dfs.core.windows.net/SeattlePublicLibrary/Library_books_about_cats.csv', format='csv'
## If header exists uncomment line below
, header=True
)
display(df.limit(10))

In [3]:
%%pyspark

# Show Schema
df.printSchema()

In [4]:
%%pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Primary storage info
capture_account_name = 'splacceler5lmevhdeon4ym' # fill in your primary account name
capture_container_name = 'capture' # fill in your container name
capture_relative_path = 'SeattlePublicLibrary/Library_books_about_cats.csv' # fill in your relative folder path

capture_adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (capture_container_name, capture_account_name, capture_relative_path)
print('Primary storage account path: ' + capture_adls_path)

In [5]:
%%pyspark

from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, DateType, TimestampType
csvSchema = StructType([
  StructField('bibnum', IntegerType(), True),
  StructField('title', StringType(), True),
  StructField('author', StringType(), True), 
  StructField('isbn', StringType(), True),
  StructField('publication_year', StringType(), True),
  StructField('publisher', StringType(), True),
  StructField('subjects', StringType(), True),
  StructField('item_type', StringType(), True),
  StructField('item_collection', StringType(), True),
  StructField('floating_item', StringType(), True),
  StructField('item_location', StringType(), True),
  StructField('reportDate', StringType(), True),
  StructField('item_count', IntegerType(), True)
])

CheckByTPI_capture_df = spark.read.format('csv').option('header', 'True').schema(csvSchema).load(capture_adls_path)

display(CheckByTPI_capture_df.limit(10))



In [17]:
%%pyspark

from pyspark.sql.functions import to_date, to_timestamp, col, date_format, current_timestamp
df_final = (CheckByTPI_capture_df.withColumn("report_date", to_date(col("reportDate"),"MM/dd/yyyy")).drop("reportDate")
                                 .withColumn('loadDate', date_format(current_timestamp(), 'MM/dd/yyyy hh:mm:ss aa'))
                                 .withColumn("load_date", to_timestamp(col("loadDate"),"MM/dd/yyyy hh:mm:ss aa")).drop("loadDate")
)

In [18]:
%%pyspark

# Show Schema
df_final.printSchema()

display(df_final.limit(10))


In [19]:
%%pyspark

from pyspark.sql import SparkSession
from pyspark.sql.types import *

# Primary storage info
compose_account_name = 'splacceler5lmevhdeon4ym' # fill in your primary account name
compose_container_name = 'compose' # fill in your container name
compose_relative_path = 'SeattlePublicLibrary/LibraryBooksAboutCats/' # fill in your relative folder path

compose_adls_path = 'abfss://%s@%s.dfs.core.windows.net/%s' % (compose_container_name, compose_account_name, compose_relative_path)
print('Primary storage account path: ' + compose_adls_path)

In [20]:
%%pyspark

compose_parquet_path = compose_adls_path + 'booksaboutcats.parquet'

print('parquet file path: ' + compose_parquet_path)

In [21]:
%%pyspark

df_final.write.parquet(compose_parquet_path, mode = 'overwrite')

In [10]:
%%sql

-- Create database SeattlePublicLibrary only if database with same name does not exist
CREATE DATABASE IF NOT EXISTS SeattlePublicLibrary

In [15]:
%%sql

-- Create table CheckoutsByTitlePhysicalItemsschemafinal only if table with same name does not exist
CREATE TABLE IF NOT EXISTS SeattlePublicLibrary.library_books_about_cats
(bibnum INTEGER
 ,title STRING
 ,author STRING
 ,isbn STRING
 ,publication_year STRING
 ,publisher STRING
 ,subjects STRING
 ,item_type STRING
 ,item_collection STRING
 ,floating_item STRING
 ,item_location STRING
 ,report_date DATE
 ,item_count INTEGER
 ,load_date TIMESTAMP
)
USING PARQUET OPTIONS (path 'abfss://compose@splacceler5lmevhdeon4ym.dfs.core.windows.net/SeattlePublicLibrary/LibraryBooksAboutCats/booksaboutcats.parquet')

In [14]:
%%sql

--DROP TABLE SeattlePublicLibrary.library_books_about_cats