In [None]:
import polars as pl

# Create Polars DataFrames
data1 = {"id": [1, 2, 3, 4],
         "age": [25, 30, 35, 40],
         "salary": [50000, 55000, 60000, 65000]}
data2 = {"id": [1, 2, 3, 4],
         "city": ["New York", "San Francisco", "Los Angeles", "Chicago"]}

df1_polars = pl.DataFrame(data1)
df2_polars = pl.DataFrame(data2)

# Perform operations
selected_df = df1_polars.select(["id", "salary"])
filtered_df = selected_df.filter(pl.col("salary") > 50000)
renamed_df = filtered_df.rename({"salary": "income"})
joined_df = renamed_df.join(df2_polars, on="id", how="inner")
conditional_df = joined_df.with_columns(pl.when(joined_df["income"] > 60000).then(1).otherwise(0).alias("high_income"))

# Apply UDF
def salary_increase(salary: int) -> int:
    return salary + 5000

udf_applied_df = conditional_df.with_columns(pl.col("income").apply(salary_increase).alias("increased_income"))

# Window function
grouped_df = udf_applied_df.groupby("city")
ranked_df = grouped_df.agg(pl.col("income").mean().alias("average_income"),
                            pl.col("increased_income").mean().alias("average_increased_income"))
sorted_df = ranked_df.sort(by=["average_income"], descending=True)

# Show the resulting DataFrame
print(sorted_df)

shape: (3, 3)
┌───────────────┬────────────────┬──────────────────────────┐
│ city          ┆ average_income ┆ average_increased_income │
│ ---           ┆ ---            ┆ ---                      │
│ str           ┆ f64            ┆ f64                      │
╞═══════════════╪════════════════╪══════════════════════════╡
│ Chicago       ┆ 65000.0        ┆ 70000.0                  │
│ Los Angeles   ┆ 60000.0        ┆ 65000.0                  │
│ San Francisco ┆ 55000.0        ┆ 60000.0                  │
└───────────────┴────────────────┴──────────────────────────┘


  udf_applied_df = conditional_df.with_columns(pl.col("income").apply(salary_increase).alias("increased_income"))
Expr.map_elements is significantly slower than the native expressions API.
Only use if you absolutely CANNOT implement your logic otherwise.
Replace this expression...
  - pl.col("income").map_elements(salary_increase)
with this one instead:
  + pl.col("income") + 5000

  udf_applied_df = conditional_df.with_columns(pl.col("income").apply(salary_increase).alias("increased_income"))
  grouped_df = udf_applied_df.groupby("city")


In [None]:
import timeit
import random
import string

# Random Data - I am using One Million Rows for this experiment.
num_rows = 1000000
ages = [random.randint(18, 65) for _ in range(num_rows)]
salaries = [random.randint(30000, 200000) for _ in range(num_rows)]
cities = [random.choice(["New York", "San Francisco", "Los Angeles", "Chicago"]) for _ in range(num_rows)]

data1 = [{"id": i, "age": age, "salary": salary} for i, (age, salary) in enumerate(zip(ages, salaries), start=1)]
data2 = [{"id": i, "city": city} for i, city in enumerate(cities, start=1)]

def pyspark_benchmark():
    from pyspark.sql import Row, SparkSession
    from pyspark.sql.functions import avg, col, when, row_number
    from pyspark.sql.window import Window

    spark = SparkSession.builder.appName("PySpark Benchmark").getOrCreate()
    df1_pyspark = spark.createDataFrame([Row(**row) for row in data1])
    df2_pyspark = spark.createDataFrame([Row(**row) for row in data2])

    joined_df = df1_pyspark.join(df2_pyspark, on="id", how="inner")
    conditional_df = joined_df.withColumn("high_income", when(col("salary") > 100000, 1).otherwise(0))
    window_spec = Window.orderBy("id")
    ranked_df = conditional_df.withColumn("rank", row_number().over(window_spec))
    result_df = (ranked_df.groupBy("city")
    .agg(avg("salary").alias("average_salary"))
    .orderBy("average_salary", ascending=False)
    .limit(10))

    result_df.show()

def polars_benchmark():
    import polars as pl

    df1_polars = pl.DataFrame(data1)
    df2_polars = pl.DataFrame(data2)

    joined_df = df1_polars.join(df2_polars, on="id", how="inner")
    conditional_df = joined_df.with_columns(pl.when(joined_df["salary"] > 100000).then(1).otherwise(0).alias("high_income"))
    ranked_df = conditional_df.with_columns(pl.col("id").rank().over("id").alias("rank"))
    result_df = (ranked_df.groupby("city")
                 .agg(pl.col("salary").mean().alias("average_salary"))
                 .sort("average_salary", descending=True)
                 .head(10))

    print(result_df)

def pandas_benchmark():
    import pandas as pd
    df1_pandas = pd.DataFrame(data1)
    df2_pandas = pd.DataFrame(data2)

    joined_df = pd.merge(df1_pandas, df2_pandas, on="id", how="inner")
    joined_df["high_income"] = (joined_df["salary"] > 100000).astype(int)
    joined_df["rank"] = joined_df["id"].rank(method="first")

    result_df = (joined_df.groupby("city")
                 .agg(average_salary=("salary", "mean"))
                 .sort_values("average_salary", ascending=False)
                 .head(10))

    print(result_df)

# Benchmarking
pyspark_time = timeit.timeit("pyspark_benchmark()", globals=globals(), number=1)
polars_time = timeit.timeit("polars_benchmark()", globals=globals(), number=1)
pandas_time = timeit.timeit("pandas_benchmark()", globals=globals(), number=1)

print(f"PySpark execution time: {pyspark_time:.2f} seconds")
print(f"Polars execution time: {polars_time:.2f} seconds")
print(f"Pandas execution time: {pandas_time:.2f} seconds")


+-------------+------------------+
|         city|    average_salary|
+-------------+------------------+
|  Los Angeles|115207.90792036832|
|San Francisco|115036.09843499839|
|      Chicago|114997.19800868603|
|     New York|114962.42313264392|
+-------------+------------------+



  result_df = (ranked_df.groupby("city")


shape: (4, 2)
┌───────────────┬────────────────┐
│ city          ┆ average_salary │
│ ---           ┆ ---            │
│ str           ┆ f64            │
╞═══════════════╪════════════════╡
│ Los Angeles   ┆ 115207.90792   │
│ San Francisco ┆ 115036.098435  │
│ Chicago       ┆ 114997.198009  │
│ New York      ┆ 114962.423133  │
└───────────────┴────────────────┘
               average_salary
city                         
Los Angeles     115207.907920
San Francisco   115036.098435
Chicago         114997.198009
New York        114962.423133
PySpark execution time: 63.13 seconds
Polars execution time: 3.75 seconds
Pandas execution time: 2.42 seconds


In [None]:
!pip install pyspark polar

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m1.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting polar
  Downloading polar-0.0.127.tar.gz (11 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting python-pptx (from polar)
  Downloading python_pptx-1.0.2-py3-none-any.whl.metadata (2.5 kB)
Collecting imblearn (from polar)
  Downloading imblearn-0.0-py2.py3-none-any.whl.metadata (355 bytes)
Collecting XlsxWriter>=0.5.7 (from python-pptx->polar)
  Downloading XlsxWriter-3.2.0-py3-none-any.whl.metadata (2.6 kB)
Downloading imblearn-0.0-py2.py3-none-any.whl (1.9 kB)
Downloading python_pptx-1.0.2-py3-none-any.whl (472 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m472.8/472.8 kB[0m [31m30.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading XlsxWriter-3.2.0-py3-none-any.whl (159 kB)
[2K   [90m━━━━━━━━━━

In [None]:
import pandas as pd

pd.__version__

'2.1.4'

In [None]:
pd.show_versions(as_json=False)




INSTALLED VERSIONS
------------------
commit              : a671b5a8bf5dd13fb19f0e88edc679bc9e15c673
python              : 3.10.12.final.0
python-bits         : 64
OS                  : Linux
OS-release          : 6.1.85+
Version             : #1 SMP PREEMPT_DYNAMIC Thu Jun 27 21:05:47 UTC 2024
machine             : x86_64
processor           : x86_64
byteorder           : little
LC_ALL              : en_US.UTF-8
LANG                : en_US.UTF-8
LOCALE              : en_US.UTF-8

pandas              : 2.1.4
numpy               : 1.26.4
pytz                : 2024.1
dateutil            : 2.8.2
setuptools          : 71.0.4
pip                 : 24.1.2
Cython              : 3.0.11
pytest              : 7.4.4
hypothesis          : None
sphinx              : 5.0.2
blosc               : None
feather             : None
xlsxwriter          : None
lxml.etree          : 4.9.4
html5lib            : 1.1
pymysql             : None
psycopg2            : 2.9.9
jinja2              : 3.1.4
IPython    

In [None]:
pip list

Package                          Version
-------------------------------- ---------------------
absl-py                          1.4.0
accelerate                       0.32.1
aiohappyeyeballs                 2.3.5
aiohttp                          3.10.2
aiosignal                        1.3.1
alabaster                        0.7.16
albucore                         0.0.13
albumentations                   1.4.13
altair                           4.2.2
annotated-types                  0.7.0
anyio                            3.7.1
argon2-cffi                      23.1.0
argon2-cffi-bindings             21.2.0
array_record                     0.5.1
arviz                            0.18.0
asn1crypto                       1.5.1
astropy                          6.1.2
astropy-iers-data                0.2024.8.5.0.32.23
astunparse                       1.6.3
async-timeout                    4.0.3
atpublic                         4.1.0
attrs                            24.2.0
audioread               

In [None]:
# WINDOW
# pip freeze | findstr pandas
!pip freeze | grep pandas

geopandas==0.14.4
pandas==2.1.4
pandas-datareader==0.10.0
pandas-gbq==0.23.1
pandas-stubs==2.1.4.231227
sklearn-pandas==2.2.0


In [None]:
!pip freeze | grep polar

polars==0.20.2


In [None]:
!pip install "dask[complete]"

Collecting lz4>=4.3.2 (from dask[complete])
  Downloading lz4-4.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.7 kB)
Collecting dask-expr<1.2,>=1.1 (from dask[complete])
  Downloading dask_expr-1.1.11-py3-none-any.whl.metadata (2.5 kB)
INFO: pip is looking at multiple versions of dask-expr to determine which version is compatible with other requirements. This could take a while.
  Downloading dask_expr-1.1.10-py3-none-any.whl.metadata (2.5 kB)
  Downloading dask_expr-1.1.9-py3-none-any.whl.metadata (2.5 kB)
Downloading lz4-4.3.3-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.3 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m1.3/1.3 MB[0m [31m5.9 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading dask_expr-1.1.9-py3-none-any.whl (241 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m241.9/241.9 kB[0m [31m11.3 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: lz4, dask-expr
Successfully install

In [None]:
# Bước 1: Cài đặt các thư viện cần thiết
!pip install dask distributed bokeh ngrok jupyter-server-proxy pyngrok

Collecting jupyter-server-proxy
  Downloading jupyter_server_proxy-4.3.0-py3-none-any.whl.metadata (8.7 kB)
Collecting pyngrok
  Downloading pyngrok-7.2.0-py3-none-any.whl.metadata (7.4 kB)
Collecting simpervisor>=1.0.0 (from jupyter-server-proxy)
  Downloading simpervisor-1.0.0-py3-none-any.whl.metadata (4.3 kB)
Downloading jupyter_server_proxy-4.3.0-py3-none-any.whl (36 kB)
Downloading pyngrok-7.2.0-py3-none-any.whl (22 kB)
Downloading simpervisor-1.0.0-py3-none-any.whl (8.3 kB)
Installing collected packages: simpervisor, pyngrok, jupyter-server-proxy
Successfully installed jupyter-server-proxy-4.3.0 pyngrok-7.2.0 simpervisor-1.0.0


In [None]:
# Bước 2: Thiết lập Dask client và cluster
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
import numpy as np
import pandas as pd

cluster = LocalCluster()
client = Client(cluster)

# Bước 3: Tạo một DataFrame Dask đơn giản
df = dd.from_pandas(pd.DataFrame(np.random.randn(1000000, 4), columns=['A', 'B', 'C', 'D']), npartitions=10)

# Bước 4: Thực hiện một số tính toán
result = df.mean().compute()
print(result)

# Bước 5: Thiết lập Bokeh server cho giao diện Dask
from dask.distributed import Client
client = Client()

# Bước 6: Sử dụng ngrok để tạo một URL công khai
from pyngrok import ngrok
import bokeh

# Tìm cổng Bokeh đang chạy
bokeh_port = list(client.scheduler_info()['services'].values())[0]
print(f"Bokeh đang chạy trên cổng: {bokeh_port}")

# Tạo tunnel ngrok đến cổng Bokeh
public_url = ngrok.connect(bokeh_port)
print(f"URL công khai cho giao diện Dask: {public_url}")

# Giữ notebook chạy
input("Nhấn Enter để kết thúc...")

Perhaps you already have a cluster running?
Hosting the HTTP server on port 46663 instead
INFO:distributed.scheduler:State start
INFO:distributed.scheduler:  Scheduler at:     tcp://127.0.0.1:41867
INFO:distributed.scheduler:  dashboard at:  http://127.0.0.1:46663/status
INFO:distributed.scheduler:Registering Worker plugin shuffle
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:46127'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38409'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:40699', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:40699
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:38974
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:43057', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:43057
INFO:distributed.core:St

A    0.000913
B   -0.000668
C    0.000374
D   -0.000373
dtype: float64


INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:38867'
INFO:distributed.nanny:        Start Nanny at: 'tcp://127.0.0.1:43241'
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:36575', name: 0, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:36575
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:41042
INFO:distributed.scheduler:Register worker <WorkerState 'tcp://127.0.0.1:39151', name: 1, status: init, memory: 0, processing: 0>
INFO:distributed.scheduler:Starting worker compute stream, tcp://127.0.0.1:39151
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:41058
INFO:distributed.scheduler:Receive client connection: Client-4fb71cb9-5c57-11ef-82ce-0242ac1c000c
INFO:distributed.core:Starting established connection to tcp://127.0.0.1:41068


Bokeh đang chạy trên cổng: 37747


ERROR:pyngrok.process.ngrok:t=2024-08-17T05:12:40+0000 lvl=eror msg="failed to reconnect session" obj=tunnels.session err="authentication failed: Usage of ngrok requires a verified account and authtoken.\n\nSign up for an account: https://dashboard.ngrok.com/signup\nInstall your authtoken: https://dashboard.ngrok.com/get-started/your-authtoken\r\n\r\nERR_NGROK_4018\r\n"
ERROR:pyngrok.process.ngrok:t=2024-08-17T05:12:40+0000 lvl=eror msg="session closing" obj=tunnels.session err="authentication failed: Usage of ngrok requires a verified account and authtoken.\n\nSign up for an account: https://dashboard.ngrok.com/signup\nInstall your authtoken: https://dashboard.ngrok.com/get-started/your-authtoken\r\n\r\nERR_NGROK_4018\r\n"


PyngrokNgrokError: The ngrok process errored on start: authentication failed: Usage of ngrok requires a verified account and authtoken.\n\nSign up for an account: https://dashboard.ngrok.com/signup\nInstall your authtoken: https://dashboard.ngrok.com/get-started/your-authtoken\r\n\r\nERR_NGROK_4018\r\n.