In [None]:
# декоратор, выводящий время выполнения запроса
@print_timing

In [None]:
# как оформить переменную внутри комментария (sql-запроса)
query = """
SELECT title, rating FROM recommendations
INNER JOIN courses ON courses.course_id = recommendations.course_id
WHERE user_id = %(user_id)s AND rating > %(threshold)s
ORDER BY rating DESC
"""
# Add the threshold parameter
predictions_df = pd.read_sql(query, db_engine, params={"user_id":user_id, "threshold":threshold})

Распределенные вычисления

In [None]:
# multiprocessing.Pool API (считаем средний возраст спортсменов по годам на 4-х нодах)
from multiprocessing import Pool

def take_mean_age(year_and_group):
  year, group = year_and_group
  return pd.DataFrame({"Age":group["Age"].mean()}, index=[year])
  
with Pool(4) as p:
  results = p.map(take_mean_age, athlete_events.groupby("Year"))

result_df = pd.concat(results)

In [None]:
# dask (считаем средний возраст спортсменов по годам на 4-х нодах)
import dask.dataframe as dd
# Set the number of pratitions
athlete_events_dask = dd.from_pandas(athlete_events, npartitions=4)
# Calculate the mean Age per Year
print(athlete_events_dask.groupby('Year').Age.mean().compute())

In [None]:
# PySpark
print(athlete_events_spark.printSchema())

print(athlete_events_spark.groupBy('Year').mean('Age').show())

Extracting data

In [None]:
# Data on Web through API
import requests

# Fetch the Hackernews post
resp = requests.get("https://hacker-news.firebaseio.com/v0/item/16222426.json")

# Print the response parsed as JSON
print(resp.json())

# Assign the score of the test to post_score
post_score = resp.json()['score']
print(post_score)

In [None]:
# Data from Databases with SQL
import sqlalchemy
# Connect to the database using the connection URI
'''
формат uri: postgresql://[user[:password]@][host][:port][/database]
            sqlite:///data.db
'''
connection_uri = "postgresql://repl:password@localhost:5432/pagila" 
db_engine = sqlalchemy.create_engine(connection_uri)

# Function to extract table to a pandas DataFrame
def extract_table_to_pandas(tablename, db_engine):
    query = "SELECT * FROM {}".format(tablename)
    return pd.read_sql(query, db_engine)

# Extract the film table into a pandas DataFrame
extract_table_to_pandas('film', db_engine)

# Extract the customer table into a pandas DataFrame
extract_table_to_pandas('customer', db_engine)

Transforming data with Pandas

In [None]:
customer_df # Pandas DataFrame with customer data
# Split email column into 2 columns on the '@' symbol
split_email = customer_df.email.str.split("@", expand=True)
# At this point, split_email will have 2 columns, a first# one with everything before @, and a second one with# everything after @
# Create 2 new columns using the resulting DataFrame.
customer_df = customer_df.assign(
    username = split_email[0],
    domain = split_email[1],
)

Loading Data

In [None]:
# Write the pandas DataFrame to parquet
film_pdf.to_parquet('films_pdf.parquet')

# Write the PySpark DataFrame to parquet
film_sdf.write.parquet('films_sdf.parquet')

In [None]:
import sqlalchemy
# Connect to the database using the connection URI
connection_uri = "postgresql://repl:password@localhost:5432/dwh"
db_engine_dwh = sqlalchemy.create_engine(connection_uri)

# Transformation step, join with recommendations data
film_pdf_joined = film_pdf.join(recommendations)

# Write to store.film
film_pdf_joined.to_sql("film", db_engine_dwh, schema="store", if_exists="replace")