Delta Lake로 데이터를 수집하도록 자동 로더 구성하기

In [None]:
# Import functions
from pyspark.sql.functions import input_file_name, current_timestamp

# Define variables used in code below
file_path = "/databricks-datasets/structured-streaming/events"
username = spark.sql("SELECT regexp_replace(current_user(), '[^a-zA-Z0-9]', '_')").first()[0]
table_name = f"{username}_etl_quickstart"
checkpoint_path = f"/tmp/{username}/_checkpoint/etl_quickstart"

# Clear out data from previous demo execution
spark.sql(f"DROP TABLE IF EXISTS {table_name}")
dbutils.fs.rm(checkpoint_path, True)

# Configure Auto Loader to ingest JSON data to a Delta table
(spark.readStream
  .format("cloudFiles")
  .option("cloudFiles.format", "json")
  .option("cloudFiles.schemaLocation", checkpoint_path)
  .load(file_path)
  .select("*", input_file_name().alias("source_file"), current_timestamp().alias("processing_time"))
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name))

스토리지 계정으로 비구조적 데이터 수집하기

In [None]:
### Spark 통해서 컨테이너 탑재 ###

#application id : Azure Active Directory > 앱 등록 > 개요 > 애플리케이션(클라이언트)ID
#secret : Azure Active Directory > 앱 등록 > 인증서 및 암호 > 클라이언트 암호의 [값]
#tenant id: Azure Active Directory > 앱 등록 > 개요 > 디렉터리(테넌트)ID


configs = {"fs.azure.account.auth.type": "OAuth",
       "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
       "fs.azure.account.oauth2.client.id": "<application-id>", 
       "fs.azure.account.oauth2.client.secret": "<client-secret>",
       "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenant-id>/oauth2/token",
       "fs.azure.createRemoteFileSystemDuringInitialization": "true"}

dbutils.fs.mount(
source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/",
mount_point = "/mnt/flightdata",
extra_configs = configs)

In [None]:
### Databricks Notebook 사용하여 CSV를 Parquet로 변환 ###

# Use the previously established DBFS mount point to read the data.
# create a data frame to read data.

flightDF = spark.read.format('csv').options(header='true', inferschema='true').load("/mnt/flightdata/*.csv")
display(flightDF)

# read the airline csv file and write the output to parquet format for easy query.
flightDF.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

데이터 살펴보기

In [None]:
import os.path
import IPython
from pyspark.sql import SQLContext
display(dbutils.fs.ls("/mnt/flightdata"))

In [None]:
dbutils.fs.put("/mnt/flightdata/1.txt", "Hello, World!", True)
dbutils.fs.ls("/mnt/flightdata/parquet/flights")

In [None]:
# Copy this into a Cmd cell in your notebook.
acDF = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/folder1/On_Time.csv/On_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1") #경로 확인!
acDF.write.parquet('/mnt/flightdata/parquet/airlinecodes')

# read the existing parquet file for the flights database that was created earlier
flightDF = spark.read.format('parquet').options(
    header='true', inferschema='true').load("/mnt/flightdata/parquet/flights")

# print the schema of the dataframes
acDF.printSchema()
flightDF.printSchema()

# print the flight database size
print("Number of flights in the database: ", flightDF.count())

# show the first 20 rows (20 is the default)
# to show the first n rows, run: df.show(n)
acDF.show(100, False)
flightDF.show(20, False)

# Display to run visualizations
# preferably run this in a separate cmd cell
display(flightDF)

In [None]:
# Run each of these queries, preferably in a separate cmd cell for separate analysis
# create a temporary sql view for querying flight information
FlightTable = spark.read.parquet('/mnt/flightdata/parquet/flights')
FlightTable.createOrReplaceTempView('FlightTable')

# create a temporary sql view for querying airline code information
AirlineCodes = spark.read.parquet('/mnt/flightdata/parquet/airlinecodes')
AirlineCodes.createOrReplaceTempView('AirlineCodes')


# using spark sql, query the parquet file to return total flights in January and February 2016
out1 = spark.sql("SELECT * FROM AirlineCodes WHERE Month=1 AND Year = 2016 ")
NumJan2016Flights = out1.count()
out2 = spark.sql("SELECT * FROM AirlineCodes WHERE Month=2 AND Year = 2016 ")
NumFeb2016Flights = out2.count()
print("Jan 2016: ", NumJan2016Flights, " Feb 2016: ", NumFeb2016Flights)
Total = NumJan2016Flights+NumFeb2016Flights
print("Total flights combined: ", Total)

# List out all the airports in Texas
#out = spark.sql(
#    "SELECT distinct(OriginCityName) FROM FlightTable where OriginStateName = 'Texas'")
#print('Airports in Texas: ', out.show(100))

# find all airlines that fly from Texas
#out1 = spark.sql(
#    "SELECT distinct(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
#print('Airlines that fly to/from Texas: ', out1.show(100, False))

DBFS(Databricks File System)에 업로드된 파일로 테이블 생성하기

In [None]:
# File location and type
file_location = "/FileStore/tables/nyc_taxi.csv"
file_type = "csv"

# CSV options
infer_schema = "true"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

In [None]:
# Create a view or table (temp table로 생성되어 해당 Notebook에서만 사용가능)

temp_table_name = "nyc_taxi_csv"

df.createOrReplaceTempView(temp_table_name)

In [None]:
/*Query the created temp table in a SQL cell*/

select * from `nyc_taxi_csv`

In [None]:
#With this registered as a temp view, it will only be available to this particular notebook. 
#If you'd like other users to be able to query this table, you can also create a table from the DataFrame.
#Once saved, this table will persist across cluster restarts as well as allow various users across different notebooks to query this data.
#To do so, choose your table name and uncomment the bottom line.
#현 Cluster에 Permanent Table로 저장하기 

permanent_table_name = "nyc_taxi_csv"

df.write.format("parquet").saveAsTable(permanent_table_name)

Delta Table 생성 및 데이터 삽입/수정/삭제, 테이블 삭제까지

In [None]:
/*SET UP: 각 사용자로 범위가 지정된 USERNAME, USERHOME, DATABASE를 정의한다.*/
%run ../Includes/Classroom-Setup-2.1

In [None]:
/* CREATE TABLE */
/* 관리되는 테이블
* Azure Databricks는 관리되는 테이블에 대한 [메타데이터], [데이터] 모두 관리함. 따라서 테이블 삭제 시, 기본 데이터도 삭제된다.
* 관리되는 테이블의 데이터는 등록된 데이터베이스의 LOCATION에 있다.
*/

CREATE TABLE students
  (id INT, name STRING, value DOUBLE);

/*
CREATE TABLE IF NOT EXISTS students 
  (id INT, name STRING, value DOUBLE)
*/

In [None]:
/* CREATE TABLE */
/* 관리되지 않는 (외부) 테이블
* Azure Databricks는 관리되지 않는 (외부) 테이블에 대한 [메타데이터]만 관리함. 따라서 테이블 삭제 시, 기본 데이터에 영향 주지 않음.
* 관리되지 않는 테이블을 생성할 때는 항상 LOCATION을 지정한다.
*/

create table delta_new(
		num int,
		name string,
		date date
	)using delta
LOCATION '[경로]'

In [None]:
/* INSERT */

INSERT INTO delta_newtable VALUES(1, 'MADS', CURRENT_DATE());

INSERT INTO students VALUES (1, "Yve", 1.0);
INSERT INTO students VALUES (2, "Omar", 2.5);
INSERT INTO students VALUES (3, "Elia", 3.3);

/*
INSERT INTO students
VALUES 
  (4, "Ted", 4.7),
  (5, "Tiffany", 5.5),
  (6, "Vini", 6.3)1) 
*/

In [None]:
/* SELECT */
SELECT * FROM delta_newtable;
SELECT * FROM students;

In [None]:
/* UPDATE */
UPDATE students 
SET value = value + 1
WHERE name LIKE "T%";

In [None]:
/* MERGE */

CREATE OR REPLACE TEMP VIEW updates(id, name, value, type) AS VALUES
  (2, "Omar", 15.2, "update"),
  (3, "", null, "delete"),
  (7, "Blue", 7.7, "insert"),
  (11, "Diya", 8.8, "update");
  
SELECT * FROM updates;

In [None]:
/*
- 데이터를 추가, 변경, 삭제하기 위해서 각각의 구문을 실행하는 방법도 있으나, 이럴 경우 3개의 개별 트랜잭션이 발생한다. 이 때 하나라도 실패하게 되면, 데이터가 잘못된 상태로 남아있을 수도 있다.
- 이 작업들을 결합하여 3가지 유형의 변경작업을 한번에 적용해보기로 한다.
- MERGE문에는 일치시킬 필드가 하나 이상 있어야 하며, 각 WHEN MATCHED / WHEN NOT MATCHED 절에는 조건문이 여러개 추가될 수 있다.
*/
MERGE INTO students b
USING updates u
ON b.id=u.id
WHEN MATCHED AND u.type = "update"
  THEN UPDATE SET *
WHEN MATCHED AND u.type = "delete"
  THEN DELETE
WHEN NOT MATCHED AND u.type = "insert"
  THEN INSERT *

In [None]:
/* DELETE */

DELETE FROM students 
WHERE value > 6 ;

In [None]:
/* DROP TABLE */
DROP TABLE students;

/*현재까지 진행한 테이블과 파일들을 삭제
%python
DA.cleanup()

*/