# Top 10 Libraries for Data Engineers

This notebook provides an overview of essential Python libraries for data engineers, including usage examples and links for further exploration.

## 1. Pandas

[Pandas](https://pandas.pydata.org/) is a library for data manipulation and analysis, offering DataFrame and Series data structures.

```python
import pandas as pd

# Creating a DataFrame
data = {'Name': ['John', 'Anna'], 'Age': [28, 22]}
df = pd.DataFrame(data)
print(df)


In [1]:
import pandas as pd

# Load a dataset
url = "https://raw.githubusercontent.com/datasets/population/main/data/population.csv"
df = pd.read_csv(url)
df

Unnamed: 0,Country Name,Country Code,Year,Value
0,Aruba,ABW,1960,54608
1,Aruba,ABW,1961,55811
2,Aruba,ABW,1962,56682
3,Aruba,ABW,1963,57475
4,Aruba,ABW,1964,58178
...,...,...,...,...
16395,Zimbabwe,ZWE,2017,14751101
16396,Zimbabwe,ZWE,2018,15052184
16397,Zimbabwe,ZWE,2019,15354608
16398,Zimbabwe,ZWE,2020,15669666


In [8]:
# Data cleaning
df = df.dropna()  # Remove missing values
df['Country Name'] = df['Country Name'].str.upper()  # Convert country names to uppercase
top_countries_by_population = df.sort_values(by='Value', ascending=False).head(10)

print(top_countries_by_population[['Country Name', 'Value']])

      Country Name       Value
16027        WORLD  7888408686
16026        WORLD  7820981524
16025        WORLD  7742681934
16024        WORLD  7661776338
16023        WORLD  7578157615
16022        WORLD  7491934113
16021        WORLD  7404910892
16020        WORLD  7317508753
16019        WORLD  7229184551
16018        WORLD  7140895722


## 2. NumPy
[NumPy](https://numpy.org/)  supports large, multi-dimensional arrays and matrices, along with a collection of mathematical functions.

In [3]:
import numpy as np

# Create a NumPy array
data = np.array([[1, 2, 3], [4, 5, 6], [7, 8, 9]])

# Element-wise operation: Add 10 to each element
transformed_data = data + 10
print("Data after adding 10 to each element:\n", transformed_data)

# Reshape the array to 1D
flattened_data = transformed_data.flatten()
print("Flattened data:\n", flattened_data)

# Calculate basic statistics
mean_value = np.mean(flattened_data)
median_value = np.median(flattened_data)
standard_deviation = np.std(flattened_data)

print("Mean value:", mean_value)
print("Median value:", median_value)
print("Standard Deviation:", standard_deviation)

# Find the maximum and minimum value and their indices in the flattened array
max_value = np.max(flattened_data)
min_value = np.min(flattened_data)
max_index = np.argmax(flattened_data)
min_index = np.argmin(flattened_data)

print("Maximum value:", max_value, "at index", max_index)
print("Minimum value:", min_value, "at index", min_index)

# Conditional selection: Find elements greater than 15
greater_than_15 = flattened_data[flattened_data > 15]
print("Elements greater than 15:", greater_than_15)


Data after adding 10 to each element:
 [[11 12 13]
 [14 15 16]
 [17 18 19]]
Flattened data:
 [11 12 13 14 15 16 17 18 19]
Mean value: 15.0
Median value: 15.0
Standard Deviation: 2.581988897471611
Maximum value: 19 at index 8
Minimum value: 11 at index 0
Elements greater than 15: [16 17 18 19]


## 3. Dask
[Dask](https://www.dask.org/) enables parallel computing, integrating with Pandas and NumPy.

In [4]:
import dask.dataframe as dd

# Create a Dask DataFrame
df = dd.read_csv('https://raw.githubusercontent.com/datasets/population/main/data/population.csv')

# Perform a parallel groupby operation
result = df.groupby(['Country Name']).Value.sum().compute()
print(result)


Country Name
Afghanistan                      1118707809
Africa Eastern and Southern     21450105617
Africa Western and Central      14614319338
Albania                           168147345
Algeria                          1587749994
                                   ...     
West Bank and Gaza                109094692
World                          332735496461
Yemen, Rep.                       963797876
Zambia                            550477100
Zimbabwe                          592721581
Name: Value, Length: 265, dtype: int64


## 4.Redis
[Redis](https://redis.io/) is an open-source, in-memory data structure store, used as a database, cache, and message broker. It supports data structures such as strings, hashes, lists, sets, sorted sets with range queries, bitmaps, hyperloglogs, geospatial indexes, and streams.

In [6]:
import redis
import time

r = redis.Redis(host='redis', port=6379, db=0)

def expensive_operation():
    # Simulate a time-consuming computation
    time.sleep(10)
    return 42

# Use Redis to cache the result
cache_key = "expensive_result"
if r.exists(cache_key):
    result = r.get(cache_key)
else:
    result = expensive_operation()
    # Cache the result for 10 seconds
    r.setex(cache_key, 10, result)

print(f"Result: {result}")


Result: b'42'


## 5. Apache Airflow
Apache Airflow automates and schedules workflows as DAGs of tasks.

In [None]:
# TBD

## 6. Luigi
Luigi manages complex pipelines of batch jobs.

In [None]:
# TBD

## 7. SQLAlchemy
SQLAlchemy is a SQL toolkit and ORM for Python.

In [7]:
from sqlalchemy import create_engine, Column, Integer, String, Sequence
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

# Define a base class for declarative class definitions
Base = declarative_base()

# Define a User class that represents a table in the database
class User(Base):
    __tablename__ = 'users'
    id = Column(Integer, Sequence('user_id_seq'), primary_key=True)
    name = Column(String(50))
    age = Column(Integer)

    def __repr__(self):
        return f"<User(name={self.name}, age={self.age})>"

# Create an in-memory SQLite database engine
engine = create_engine('sqlite:///:memory:', echo=True)

# Create the table defined by the User class
Base.metadata.create_all(engine)

# Create a session to interact with the database
Session = sessionmaker(bind=engine)
session = Session()

# Add a new user to the table
new_user = User(name='John Doe', age=30)
session.add(new_user)
session.commit()

# Query the user table
user = session.query(User).filter_by(name='John Doe').first()
print(user)

# Add multiple users
session.add_all([
    User(name='Jane Doe', age=25),
    User(name='Jim Beam', age=40)
])
session.commit()

# Query all users
all_users = session.query(User).all()
for user in all_users:
    print(user)


2024-04-01 17:44:13,734 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-04-01 17:44:13,735 INFO sqlalchemy.engine.Engine PRAGMA main.table_info("users")
2024-04-01 17:44:13,736 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-04-01 17:44:13,738 INFO sqlalchemy.engine.Engine PRAGMA temp.table_info("users")
2024-04-01 17:44:13,739 INFO sqlalchemy.engine.Engine [raw sql] ()
2024-04-01 17:44:13,741 INFO sqlalchemy.engine.Engine 
CREATE TABLE users (
	id INTEGER NOT NULL, 
	name VARCHAR(50), 
	age INTEGER, 
	PRIMARY KEY (id)
)


2024-04-01 17:44:13,742 INFO sqlalchemy.engine.Engine [no key 0.00101s] ()
2024-04-01 17:44:13,743 INFO sqlalchemy.engine.Engine COMMIT
2024-04-01 17:44:13,746 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2024-04-01 17:44:13,750 INFO sqlalchemy.engine.Engine INSERT INTO users (name, age) VALUES (?, ?)
2024-04-01 17:44:13,752 INFO sqlalchemy.engine.Engine [generated in 0.00194s] ('John Doe', 30)
2024-04-01 17:44:13,753 INFO sqlalchemy.engine.Engine COMMIT
2024-0

  Base = declarative_base()


## 8 pytest
[pytest](https://docs.pytest.org/en/8.0.x/) is a framework that makes it easy to write simple tests, yet scales to support complex functional testing.

In [11]:
# Example pytest test case
def test_addition():
    assert 1 + 1 == 2

# Run this test in the terminal using: pytest <filename>.py


## 9. PyArrow
PyArrow facilitates efficient data handling.

In [8]:
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

# Create a PyArrow Array
data = pa.array([1, 2, 3, 4, 5])
print("Array:", data)

# Create a Table
data = pa.Table.from_pandas(pd.DataFrame({'a': [1, 2, 3, 4, 5],
                                          'b': ['A', 'B', 'C', 'D', 'E']}))
print("Table:\n", data)

# Save the table to a Parquet file
pq.write_table(data, 'example_table.parquet')

# Read the table back from the Parquet file
table_from_parquet = pq.read_table('example_table.parquet')
print("Table read from Parquet:\n", table_from_parquet)

# Convert the table back to Pandas DataFrame
df_from_parquet = table_from_parquet.to_pandas()
print("DataFrame from Parquet:\n", df_from_parquet)


Array: [
  1,
  2,
  3,
  4,
  5
]
Table:
 pyarrow.Table
a: int64
b: string
----
a: [[1,2,3,4,5]]
b: [["A","B","C","D","E"]]
Table read from Parquet:
 pyarrow.Table
a: int64
b: string
----
a: [[1,2,3,4,5]]
b: [["A","B","C","D","E"]]
DataFrame from Parquet:
    a  b
0  1  A
1  2  B
2  3  C
3  4  D
4  5  E


## 10. Great Expectations
[Great Expectations](https://docs.greatexpectations.io/docs/oss/tutorials/quickstart/)
  ensures data meets quality standards.

In [11]:
import great_expectations as ge
from great_expectations.dataset import PandasDataset

# Load your data into a Pandas DataFrame
data = {
    "name": ["John", "Jane", "Doe", None],
    "age": [28, 33, 23, 29]
}
df = ge.dataset.PandasDataset(data)

# Set up expectations
df.expect_column_values_to_not_be_null(column="name")
df.expect_column_values_to_be_of_type(column="age", type_="int")
df.expect_column_values_to_be_in_set(column="name", value_set=["John", "Jane", "Doe", "Alice"])

# Validate the data against the expectations
results = df.validate()

# Print validation results
# print(results)

# Save the expectation suite for later use
df.save_expectation_suite("my_expectation_suite.json")

# To validate another dataset against saved expectations:
suite = df.get_expectation_suite()

# Load a new dataset
new_data = {
    "name": ["Alice", "Bob", "Charlie"],
    "age": [25, 30, 35]
}
new_df = ge.dataset.PandasDataset(new_data, expectation_suite=suite)

# Validate the new dataset
new_results = new_df.validate()
print(new_results)


{
  "success": false,
  "results": [
    {
      "success": true,
      "expectation_config": {
        "expectation_type": "expect_column_values_to_be_of_type",
        "kwargs": {
          "column": "age",
          "type_": "int"
        },
        "meta": {}
      },
      "result": {
        "observed_value": "int64"
      },
      "meta": {},
      "exception_info": {
        "raised_exception": false,
        "exception_message": null,
        "exception_traceback": null
      }
    },
    {
      "success": false,
      "expectation_config": {
        "expectation_type": "expect_column_values_to_be_in_set",
        "kwargs": {
          "column": "name",
          "value_set": [
            "John",
            "Jane",
            "Doe",
            "Alice"
          ]
        },
        "meta": {}
      },
      "result": {
        "element_count": 3,
        "missing_count": 0,
        "missing_percent": 0.0,
        "unexpected_count": 2,
        "unexpected_percent": 66.666