# Data Pipelines
#### Silicon Slayers

Source: [Databricks](https://docs.databricks.com/en/getting-started/data-pipeline-get-started.html#:~:text=A%20data%20pipeline%20implements%20the,data%20that%20users%20can%20consume)



#### Before we begin
Get your cluster up and running

### What is a data pipeline? 
Data pipelines are a process for moving data from sources (like a UI/Application) to targets (like a data warehouse). 

Data is extracted from the source, "flows" through the pipeline and neccesary transformations, and is loaded into the target. 

A key feature of data pipelines is automation; data can be loaded in batches or in (near) real time. 

### What is ETL? 
A common abbreviation is ETL, which stands for Extract, Transform, Load. ELT is also becoming more popular with cloud warehouses. 

### What are common ETL Tools? 
- Apache (Airflow, Kafka, Spark)  
- dbt (Data Build Tool)  
- Informatica  

- AWS Glue  
- Google Cloud Dataflow  
- Microsoft Azure Data Factory  


In [None]:
from pyspark.sql.types import DoubleType, IntegerType, StringType, StructType, StructField

# Define variables used in the code below
file_path = "/databricks-datasets/songs/data-001/"
table_name = "raw_song_data"
checkpoint_path = "/tmp/pipeline_get_started/_checkpoint/song_data"

schema = StructType(
  [
    StructField("artist_id", StringType(), True),
    StructField("artist_lat", DoubleType(), True),
    StructField("artist_long", DoubleType(), True),
    StructField("artist_location", StringType(), True),
    StructField("artist_name", StringType(), True),
    StructField("duration", DoubleType(), True),
    StructField("end_of_fade_in", DoubleType(), True),
    StructField("key", IntegerType(), True),
    StructField("key_confidence", DoubleType(), True),
    StructField("loudness", DoubleType(), True),
    StructField("release", StringType(), True),
    StructField("song_hotnes", DoubleType(), True),
    StructField("song_id", StringType(), True),
    StructField("start_of_fade_out", DoubleType(), True),
    StructField("tempo", DoubleType(), True),
    StructField("time_signature", DoubleType(), True),
    StructField("time_signature_confidence", DoubleType(), True),
    StructField("title", StringType(), True),
    StructField("year", IntegerType(), True),
    StructField("partial_sequence", IntegerType(), True)
  ]
)

(spark.readStream
  .format("cloudFiles")
  .schema(schema)
  .option("cloudFiles.format", "csv")
  .option("sep","\t")
  .load(file_path)
  .writeStream
  .option("checkpointLocation", checkpoint_path)
  .trigger(availableNow=True)
  .toTable(table_name)
)

Out[3]: <pyspark.sql.streaming.query.StreamingQuery at 0x7fc515071ca0>

New section