![](imgs/datal.png)

- learn how to use databrick env
  - create cluster
  - switch on dbfs 
  - upload required csv,
  create delta table
- understand lld(low level design) of writing good codebase in data engineering
  - understand factory pattern
  - create one abstract reader which will use factory pattern
- design code base
  - required extractor,transformer&loader class
  - required utils class
- spark concept
  - sparkSession
  - row vs column file formats
  - broadcast join
  - narrow vs wide transformations
  - job status & task
- apache spark concepts
  - repartition vs coalesce
  - partitionBy vs Bucketing
  - spark ui
  - datalake vs datalakehouse
  - using optimizing technique like predicate pushdown,predicate pruning 

___

- different business logic
   - customer who have bought airpods after buying iphone
   - customer who have bought both airpods and iphone
   - list all the products bought by customers after their initial purchase
   - determine the average time delay buying an iphone and buying airpods for each customers
   - identify the top 3 selling products in each category by total revenue

___

- SETTINGS -> advanced -> search DBFS -> TURN ON (Enable or disable DBFS browser)
    - You can browse and manage files stored in DBFS via the Databricks UI

___

- Compute (create first cluster)
  - create compute 
     - compute name (`cluster1`)
       

![](imgs/cluster.png)

___

- catalog -> DBFS -> FileStore -> tables  -> upload

___

- workspace -> users -> user@gmail.com -> user@gmail.com -> dropdown -> create -> Folder (`appleAnalysis` )->create folder

- workspace -> users -> user@gmail.com -> user@gmail.com -> (`appleAnalysis`) -> create -> notebook(`apple_analysis`)

___

- check wether cluster is created 

___

- catalog - > database tables -> default -> create table -> DBFS -> FileStore -> tables -> select(customer_update.csv)->create table with ui  - (created delta table)
- select a cluster to preview the table
  - cluster1
- preview table
  - set first row as header
- create table
 

___

`apple_analysis.ipynb`

```python 



%run "./reader_factory"   # to import from ipynb

%run "./transform"

class Workflow:
    def __init__(self):
        pass
    def runner(self):
        transactionInputDF=get_data_source(data_type="csv",
        file_path="dbfs:/FileStore/tables/Transaction_Updated.csv").get_data_frame()
        transactionInputDF.show()
        inputDFs={"transactionInputDF":transctionInputDF}
        #customers who bought airpods after iphone
        firstTransform=FirstTransformer().transform(inputDFs)

workFlow=Workflow().runner()



#LEAD - > PARTITION BY CUSTOMER ID ,order by transaction_data asc


 





```python

from pyspark.sql import SparkSession
spark=SparkSession.builder.appName("thebigdata.me").getOrCreate()
input_df=(
    spark
    .read
    .format("csv")
    .option("header",True)
    .load("dbfs:/FileSore/tables/Transaction_Updated.csv")
)
input_df.show()


### create another notebook


`reader_factory.ipynb`

```python

class DataSource:
    def __init__(self, path):
        self.path = path
    def get_data_frame(self):
        raise ValueError("not implemented")
class CSVDataSource(DataSource):
    def get_data_frame(self):
        return(spark.read.format("csv").option("header",True).load(self.path))
class ParquetDataSource(DataSource):
    def get_data_frame(self):
        return(spark.read.format("parquet").load(self.path))
class DeltaDataSource(DataSource):
    def get_data_frame(self):

        table_name=self.path
        return(spark.read.format("delta").load(table_name))


def get_data_source(data_type,file_path):
    if data_type=="csv":
        return CSVDataSource(file_path)
    elif data_type=="parquet":
        return ParquetDataSource(file_path)
    elif data_type=="delta":
        return DeltaDataSource(file_path)
    else:
        raise ValueError("Unsupported data type")



- a job in spark represents a computation triggered by an action such as count(),collect(),save(),etc
- when you perform an action on a dataframe or rdd ,spark submits a job
- a stage consist of sequence of transformation that can be formed without shuffling entire datset across the partitions
- stages are divided by transformations that requires a shuffle such as groupBy(),reduceByKey() 

- each stage has its own set of tasks that execute the same code but on different partitions of dataset ,and spark tries to minimize shuffling between stages to optimize performance
- task is the smallest unit of work in spark.it represents the computation performed on a single partition of the dataset

- when spark executes a stage,it devides the data into tasks ,each of which processes a slice of data in parallel

___

`extractor.ipynb`

```python


class Extractor:
    def __init__(self):
        pass
    def extract(self,):
        pass
    

___
`transform.ipynb`

```python

from pyspark.sql.window import Window
from pyspark.sql.functions import lead,col

class Transformer:
    def __init__(self):
        pass
    def transform(self,inputDF):
        pass
        
    
class FirstTransformer(Transformer):
    def transform(self,inputDF):
        transactionInputDF=inputDFs.get("transactioninputDF")
        print("transactioninputDF in transform")
        transactionInputDF.show()

        windowSpec=Window.partitionBy("customer_id").orderBy("transaction_date")
        transformedDF=transactionInputDF.withColumn(
            "next_product_name",lead("product_name").over(windowSpec)
        )
        print("airpod after iphone")
        transformedDF.orderBy("customer_id","transaction_date","product_name").show() 
        filteredDF= transformedDF.filter((col("product_name")=="iPhone") & (col("next_product_name")=="AirPods"))
        filteredDF.orderBy("customer_id","transaction_date","product_name").show()
        joinDF=
       


`loader.ipynb`
```python
class Loader:
    def __init__(self):
        pass
    def sink(self):
        pass


- LEAD() / LAG()
   - IN MYSQL USED TO GET preceding and succeeding value of any row within its partition
    

```python

file_location="/FileStore/tables/Customer_Updated.csv"
file_type="csv"
infer_schema="true"
first_row_is_header="true"
delimiter=","
df=spark.read.format(file_type) \
  .option("inferSchema",infer_schema) \
  .option("header",first_row_is_header) \
  .option("sep",delimiter) \
  .load(file_location)
display(df)

```python
temp_table_name="customer_delta_table"
df.createOrReplaceTempView(temp_table_name)

%sql
select * from `Customer_delta_table`

```python
permanent_table_name="Customer_delta_table_persist"
df.write.format("parquet").saveAsTable(permanent_table_name)
