Commonly used by data scientists, pandas is a Python package that provides easy-to-use data structures and data analysis tools for the Python programming language. However, pandas does not scale out to big data. Pandas API on Spark fills this gap by providing pandas equivalent APIs that work on Apache Spark. Pandas API on Spark is useful not only for pandas users but also PySpark users, because pandas API on Spark supports many tasks that are difficult to do with PySpark, for example plotting data directly from a PySpark DataFrame.

- If you use Pandas but you are not familiar with Spark, you can work with Spark right away, with no learning curve.

- You can have a single codebase for everything: small data and big data. A single machine and distributed machines.

- You can run your Pandas code faster.

![comparison with dask](https://www.databricks.com/wp-content/uploads/2021/04/koalas-blog-img-resampled-1.jpg)

![with filtering](https://www.databricks.com/wp-content/uploads/2021/04/koalas-blog-img-resampled-2.jpg)

In [0]:
import pyspark.pandas as ps

In [0]:
dir(ps)

Out[2]: ['CategoricalIndex',
 'DataFrame',
 'DatetimeIndex',
 'Float64Index',
 'Index',
 'Int64Index',
 'LooseVersion',
 'MultiIndex',
 'NamedAgg',
 'Series',
 'TaskContext',
 'TimedeltaIndex',
 '__all__',
 '__builtins__',
 '__cached__',
 '__doc__',
 '__file__',
 '__loader__',
 '__name__',
 '__package__',
 '__path__',
 '__spec__',
 '_auto_patch_pandas',
 '_auto_patch_spark',
 '_frame_has_class_getitem',
 '_series_has_class_getitem',
 '_typing',
 'accessors',
 'base',
 'broadcast',
 'categorical',
 'concat',
 'config',
 'data_type_ops',
 'date_range',
 'datetimes',
 'exceptions',
 'frame',
 'from_pandas',
 'generic',
 'get_dummies',
 'get_option',
 'groupby',
 'indexes',
 'indexing',
 'internal',
 'isna',
 'isnull',
 'melt',
 'merge',
 'merge_asof',
 'missing',
 'ml',
 'mlflow',
 'namespace',
 'notna',
 'notnull',
 'option_context',
 'options',
 'os',
 'plot',
 'pyarrow',
 'range',
 'read_clipboard',
 'read_csv',
 'read_delta',
 'read_excel',
 'read_html',
 'read_json',
 'read_orc',
 'r

In [0]:
dir(ps.options)

Out[3]: ['compute.default_index_type',
 'compute.eager_check',
 'compute.isin_limit',
 'compute.max_rows',
 'compute.ops_on_diff_frames',
 'compute.ordered_head',
 'compute.shortcut_limit',
 'display.max_rows',
 'plotting.backend',
 'plotting.max_rows',
 'plotting.sample_ratio']

- pandas.core.frame.DataFrame: single machine

- pyspark.pandas.frame.DataFrame: distributed

In [0]:
import pyspark.pandas as ps
import pandas as pd
import numpy as np

df_ps = ps.read_delta("/mnt/datadrop-test/fw_fish/stream_bronze/budget_file_name_date_oct_1to15_2022/")
type(df_ps)



Out[5]: pyspark.pandas.frame.DataFrame

In [0]:
df_pd = pd.read_parquet('/dbfs/mnt/datadrop-test/fw_fish/stream_bronze/budget_file_name_date_oct_1to15_2022/')
type(df_pd)

Out[6]: pandas.core.frame.DataFrame

In [0]:
df_ps_2_pd = df_ps.to_pandas()
type(df_ps_2_pd)

Out[7]: pandas.core.frame.DataFrame

In [0]:
df_pd_2_ps = ps.from_pandas(df_pd)
type(df_pd_2_ps)

Out[9]: pyspark.pandas.frame.DataFrame

Note that if you are using multiple machines, when converting a Pandas-on-Spark Dataframe into a Pandas Dataframe, data is transferred from multiple machines to a single one, and vice-versa

In [0]:
df_spark = df_ps.to_spark()
type(df_spark)

Out[11]: pyspark.sql.dataframe.DataFrame

In [0]:
df_ps_new = df_spark.pandas_api()
type(df_ps_new)

Out[13]: pyspark.pandas.frame.DataFrame

In [0]:
df_ps.columns

Out[38]: Index(['advertiser_id', 'advertising_product_type', 'budget',
       'budget_scope_id', 'budget_scope_type', 'budget_usage_percentage',
       'dataset_id', 'marketplace_id', 'usage_updated_timestamp',
       'file_date_time', 'usage_updated_timestamp2', 'file_date_time2',
       'minutes_diff'],
      dtype='object')

In [0]:
df_ps.dtypes

Out[14]: advertiser_id                       object
advertising_product_type            object
budget                             float64
budget_scope_id                     object
budget_scope_type                   object
budget_usage_percentage            float64
dataset_id                          object
marketplace_id                      object
usage_updated_timestamp             object
file_date_time                      object
usage_updated_timestamp2    datetime64[ns]
file_date_time2             datetime64[ns]
minutes_diff                       float64
dtype: object

In [0]:
df_ps.head()

Unnamed: 0,advertiser_id,advertising_product_type,budget,budget_scope_id,budget_scope_type,budget_usage_percentage,dataset_id,marketplace_id,usage_updated_timestamp,file_date_time,usage_updated_timestamp2,file_date_time2,minutes_diff
2965762,ENTITY32FCMYHLD5TT5,sp,389.96,146967882456693,CAMPAIGN,55.26,budget-usage,ATVPDKIKX0DER,2022-10-11T16:25:15Z,2022-10-11-16-30-09,2022-10-11 16:25:15,2022-10-11 16:30:09,4.9
2965763,ENTITY5U0BYY2U38HR,sp,374.4,277169250999342,CAMPAIGN,85.18,budget-usage,ATVPDKIKX0DER,2022-10-11T16:26:02Z,2022-10-11-16-30-09,2022-10-11 16:26:02,2022-10-11 16:30:09,4.12
2965764,ENTITY2ZVDWJCGA7N7B,sp,56.3,147940483039430,CAMPAIGN,8.47,budget-usage,ATVPDKIKX0DER,2022-10-11T16:26:49Z,2022-10-11-16-30-09,2022-10-11 16:26:49,2022-10-11 16:30:09,3.33
2965765,ENTITY31HC6VQONPBOD,sp,7.64,43505528567681,CAMPAIGN,66.23,budget-usage,ATVPDKIKX0DER,2022-10-11T16:27:27Z,2022-10-11-16-30-09,2022-10-11 16:27:27,2022-10-11 16:30:09,2.7
2965766,ENTITY1UOBRQ39USYG7,sb,100.0,144290983053228187,CAMPAIGN,96.31,budget-usage,A2Q3Y263D00KWC,2022-10-11T16:25:29Z,2022-10-11-16-30-09,2022-10-11 16:25:29,2022-10-11 16:30:09,4.67


In [0]:
df_ps[['budget', 'budget_usage_percentage']].describe()

Unnamed: 0,budget,budget_usage_percentage
count,3458467.0,3458467.0
mean,55136.85,44.12482
std,23438720.0,300.5584
min,1.0,0.0
25%,16.55,15.31
50%,50.0,36.95
75%,150.0,67.89
max,10000000000.0,453553.0


In [0]:
df_ps.isnull().sum()

Out[7]: advertiser_id                  0
advertising_product_type    1614
budget                         0
budget_scope_id                0
budget_scope_type              0
budget_usage_percentage        0
dataset_id                     0
marketplace_id                 0
usage_updated_timestamp        0
file_date_time                 0
usage_updated_timestamp2       0
file_date_time2                0
minutes_diff                   0
dtype: int64

In [0]:
df_ps.marketplace_id.value_counts(normalize=True)

Out[8]: ATVPDKIKX0DER     0.867564
A2EUQ1WTGCTBG2    0.108485
A1AM78C64UM0Y8    0.020884
A2Q3Y263D00KWC    0.003066
Name: marketplace_id, dtype: float64

In [0]:
df_ps.advertiser_id.nunique()

Out[6]: 347

In [0]:
df_ps.groupby(['advertiser_id']).budget_scope_id.count()

Out[16]: advertiser_id
ENTITYMNC7R8USIYO0       1574
ENTITY24NJD5I0B4993       229
ENTITYFCTZKK8S6F4L       6389
ENTITY1VPEBRSCE0LIU      1312
ENTITY2ENDW3U3WOIVO      3627
ENTITY2H8KY25F5EHUE     19227
ENTITY34I5D9IPGCP7Q      3761
ENTITYDSK49EZJPS26      19895
ENTITY1B9CKJJA1CBXB     10563
A32U15M077H6Z6          33465
ENTITY13BBG132ZA6AC      1386
ENTITY1VVK8FT70Q49K      2079
ENTITY1B3YW5IYOJKZO      2189
ENTITY1Z2AOJR3WZS3C      4659
ENTITY39NGZ7JCOM0XF      2993
ENTITY31Q1I62V7AXFV     20263
ENTITY14RQ3TMEX6QX9       562
ENTITY2Q91U4F9Z1Z31      3341
ENTITY23JNRRL6QJIRJ     45724
ENTITY1MIJBOTNKFBPT      2018
A9QQ3BYDJ7ZME             652
ENTITY3AWOI05X3LTRT       418
ENTITY27IK50HSCLGS4      7611
ENTITY20X0GIASH11T2     29811
ENTITY2PFKVKN01H12D      6184
ENTITY1LH09PWZAQ6O6      7804
ENTITY3CKZT8K66T25U      3354
ENTITY12Y9WGZMKEYGU       234
ENTITYJZNUHFPIDL70       1950
ENTITY166W73YKEFIMC      5514
ENTITYW7HEDFPP3RGZ        135
ENTITY2EOA95OWO0CRI       595
ENTITY2BZP7VN7ARJ

In [0]:
df_ps[['advertiser_id','marketplace_id']].drop_duplicates().head()

Unnamed: 0,advertiser_id,marketplace_id
116602,A19EHJRIAHY95T,ATVPDKIKX0DER
25646,A1F3R2IX95F1NB,ATVPDKIKX0DER
842,A1FLHWKA43KGDZ,ATVPDKIKX0DER
14750,ENTITY12Y9WGZMKEYGU,ATVPDKIKX0DER
385,ENTITY189I4QBHE1O2H,ATVPDKIKX0DER


In [0]:
df_ps.groupby('advertiser_id')['budget'].mean()

Out[22]: advertiser_id
ENTITY24NJD5I0B4993    2.621467e+01
ENTITYMNC7R8USIYO0     1.260606e+03
ENTITY2ENDW3U3WOIVO    1.150185e+03
ENTITYFCTZKK8S6F4L     2.206336e+01
ENTITY1VPEBRSCE0LIU    2.022652e+02
ENTITY2H8KY25F5EHUE    1.142497e+02
ENTITY34I5D9IPGCP7Q    3.600178e+01
ENTITYDSK49EZJPS26     3.716014e+01
A32U15M077H6Z6         2.362492e+01
ENTITY13BBG132ZA6AC    2.116162e+00
ENTITY1B9CKJJA1CBXB    5.890321e+01
ENTITY1VVK8FT70Q49K    7.193233e+02
ENTITY1B3YW5IYOJKZO    4.721106e+01
ENTITY1Z2AOJR3WZS3C    8.279625e+01
ENTITY39NGZ7JCOM0XF    1.216998e+02
ENTITY31Q1I62V7AXFV    2.597794e+01
ENTITY14RQ3TMEX6QX9    2.041708e+02
ENTITY2Q91U4F9Z1Z31    7.229259e+01
ENTITY23JNRRL6QJIRJ    4.547162e+02
ENTITY1MIJBOTNKFBPT    1.147126e+02
A9QQ3BYDJ7ZME          5.444785e+01
ENTITY3AWOI05X3LTRT    7.793900e+00
ENTITY27IK50HSCLGS4    2.563822e+01
ENTITY20X0GIASH11T2    6.210792e+01
ENTITY3CKZT8K66T25U    5.701613e+01
ENTITY1LH09PWZAQ6O6    2.883240e+02
ENTITY2PFKVKN01H12D    6.169907e+02
ENTIT

In [0]:
filtered = df_ps[df_ps.marketplace_id != 'ATVPDKIKX0DER']
filtered.shape

Out[9]: (458024, 13)

In [0]:
filtered = df_ps[df_ps.advertiser_id.str.contains("B9CK")]
filtered.shape

Out[10]: (10563, 13)

In [0]:
filtered = df_ps[df_ps.usage_updated_timestamp2.dt.month==9]
set(filtered.usage_updated_timestamp2.dt.month.to_numpy())


Out[13]: {9}

In [0]:
df_ps.groupby('advertiser_id').mean().head()

Unnamed: 0_level_0,budget,budget_usage_percentage,minutes_diff
advertiser_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
ENTITYMNC7R8USIYO0,1260.605972,39.793698,-4.126881
ENTITY24NJD5I0B4993,26.214672,51.125415,-3.965721
ENTITY2ENDW3U3WOIVO,1150.184778,32.524014,-4.274017
ENTITYFCTZKK8S6F4L,22.063356,56.087934,-4.247962
ENTITY1VPEBRSCE0LIU,202.265244,50.715739,-4.33625


In [0]:
df_ps.dtypes

Out[11]: advertiser_id                       object
advertising_product_type            object
budget                             float64
budget_scope_id                     object
budget_scope_type                   object
budget_usage_percentage            float64
dataset_id                          object
marketplace_id                      object
usage_updated_timestamp             object
file_date_time                      object
usage_updated_timestamp2    datetime64[ns]
file_date_time2             datetime64[ns]
minutes_diff                       float64
dtype: object

In [0]:
df_ps.groupby('advertiser_id')['budget'].apply(np.cumsum).head(20)

Out[28]: 2074400        25.0
2075115       225.0
2075116       425.0
2075671       475.0
2076615       525.0
2077354       575.0
2078626       600.0
2078825       650.0
2079322       675.0
2080289       700.0
2080466       750.0
2080665       950.0
2081030       965.0
2081564      1122.5
2081962      1322.5
2081963      1347.5
2082556      1372.5
2082748      1472.5
2082950      1522.5
2083337      1572.5
2084405      1597.5
2084578      1622.5
2084967      1647.5
2085688      1672.5
2085887      1697.5
2086072      1722.5
2087014      1772.5
2087206      1797.5
2087207      1822.5
2087795      1897.5
2087796      1947.5
2088155      1972.5
2088302      2022.5
2088685      2072.5
2089864      2122.5
2090606      2172.5
2092775      2222.5
2093163      2247.5
2093564      2257.5
2094867      2272.5
2095522      2372.5
2097249      2397.5
2097762      2407.5
2100597      2432.5
2100602      2532.5
2100799      2732.5
2100977      2932.5
2101537      3132.5
2102049      3332.5
2103325    

In [0]:
df_ps.marketplace_id.unique()

Out[37]: 0    A1AM78C64UM0Y8
1    A2Q3Y263D00KWC
2     ATVPDKIKX0DER
3    A2EUQ1WTGCTBG2
Name: marketplace_id, dtype: object

In [0]:
df_ps.groupby('marketplace_id')['budget'].mean().reset_index().plot.pie(y='budget')

In [0]:
df_ps.groupby('budget_scope_id')['budget'].mean().sort_values(ascending = False).head(10).plot.barh()

In [0]:
df_ps['hour'] = df_ps['usage_updated_timestamp2'].dt.hour
df_ps.groupby('hour')['budget'].mean().plot.bar()

In [0]:
dir(df_ps.groupby('hour')['budget'])

Out[15]: ['__abstractmethods__',
 '__class__',
 '__class_getitem__',
 '__delattr__',
 '__dict__',
 '__dir__',
 '__doc__',
 '__eq__',
 '__format__',
 '__ge__',
 '__getattr__',
 '__getattribute__',
 '__gt__',
 '__hash__',
 '__init__',
 '__init_subclass__',
 '__le__',
 '__lt__',
 '__module__',
 '__ne__',
 '__new__',
 '__orig_bases__',
 '__parameters__',
 '__reduce__',
 '__reduce_ex__',
 '__repr__',
 '__setattr__',
 '__sizeof__',
 '__slots__',
 '__str__',
 '__subclasshook__',
 '__weakref__',
 '_abc_impl',
 '_agg_columns',
 '_agg_columns_scols',
 '_agg_columns_selected',
 '_apply_series_op',
 '_as_index',
 '_build',
 '_cleanup_and_return',
 '_column_labels_to_exclude',
 '_dropna',
 '_groupkeys',
 '_groupkeys_scols',
 '_is_protocol',
 '_limit',
 '_make_pandas_df_builder_func',
 '_prepare_group_map_apply',
 '_psdf',
 '_psser',
 '_reduce_for_stat_function',
 '_resolve_grouping',
 '_resolve_grouping_from_diff_dataframes',
 '_spark_group_map_apply',
 '_spark_groupby',
 'agg',
 'aggregate',
 'all

### Internal type mapping
The table below shows which NumPy data types are matched to which PySpark data types internally in pandas API on Spark.