# Prefect

Prefect — это фреймворк с открытым исходным кодом для построения рабочих процессов на Python. Он позволяет легко создавать, запускать и контролировать конвейеры данных различного масштаба.

In [1]:
%%capture
!pip install prefect

In [11]:
import pandas as pd
from prefect import task, Flow, Parameter
import sqlite3
import time

In [12]:
@task
def extract(current_url:str, current_path_dataset:str):
  df = pd.read_csv(current_url, sep=',', decimal='.')
  df.to_parquet(current_path_dataset,index=False, partition_cols=['family_type'])

In [13]:
@task
def transform(current_path_dataset:str)->pd.DataFrame:
  time.sleep(10)
  df = pd.read_parquet(current_path_dataset)
  df = df[['basic_retailer','favorite_kind_cheese']]
  df.columns = ['retailer','kind_cheese']
  return df

In [14]:
@task
def create_connect():
   connect = sqlite3.connect("db.sqlite3")
   return connect

In [15]:
@task
def load(current_df:pd.DataFrame, current_connect):
  time.sleep(10)
  df = current_df.groupby(by=['retailer','kind_cheese'],as_index=False)['kind_cheese'].size() \
                                                                              .rename(columns={"size": "total_amount"})
  df = df.sort_values(by=['retailer','total_amount'],ascending=[True, False])
  df.to_sql(name='result', if_exists='replace', index=False, con=current_connect)

In [16]:
@task
def query(current_sql:str, current_connect)->pd.DataFrame:
  time.sleep(10)
  result = pd.read_sql(current_sql, current_connect)
  print(result) 

In [17]:
with Flow("Prefect-ETL") as flow:
  url_file = Parameter("url_file", default="https://raw.githubusercontent.com/grishenkovp/project_cheese_market/main/project_v2/customers_info/customers_info_0.csv")
  path_dataset = Parameter("path_dataset", default='/content/dataset/dataset.parquet')
  sql = Parameter("sql", default= """select * from result""")
  
  extract(url_file, path_dataset)
  data = transform(path_dataset)
  con = create_connect()
  load(data, con)
  query(sql, con)

In [18]:
flow.run()

[2022-03-22 08:54:21+0000] INFO - prefect.FlowRunner | Beginning Flow run for 'Prefect-ETL'
[2022-03-22 08:54:21+0000] INFO - prefect.TaskRunner | Task 'url_file': Starting task run...
[2022-03-22 08:54:21+0000] INFO - prefect.TaskRunner | Task 'url_file': Finished task run for task with final state: 'Success'
[2022-03-22 08:54:21+0000] INFO - prefect.TaskRunner | Task 'create_connect': Starting task run...
[2022-03-22 08:54:21+0000] INFO - prefect.TaskRunner | Task 'create_connect': Finished task run for task with final state: 'Success'
[2022-03-22 08:54:21+0000] INFO - prefect.TaskRunner | Task 'path_dataset': Starting task run...
[2022-03-22 08:54:21+0000] INFO - prefect.TaskRunner | Task 'path_dataset': Finished task run for task with final state: 'Success'
[2022-03-22 08:54:21+0000] INFO - prefect.TaskRunner | Task 'sql': Starting task run...
[2022-03-22 08:54:21+0000] INFO - prefect.TaskRunner | Task 'sql': Finished task run for task with final state: 'Success'
[2022-03-22 08:54:

<Success: "All reference tasks succeeded.">