In [1]:
import pandas as pd
import os
import gc
import polars as pl

batch_size = 100000

### NPI files

In [2]:
data_dir = "nppes_zip_files_v2"

In [None]:
ls_file = []
ls_dir = []
for dir_name in os.listdir(data_dir):
    path = os.path.join(data_dir, dir_name)
    if os.path.isdir(path):
        for filename in os.listdir(path):

            # delete all "fileheader", they contain no information
            if "fileheader" in filename:
                file_path = os.path.join(data_dir,dir_name, filename)
                try:
                    os.remove(file_path)
                    print(f"Deleted: {filename}")
                except Exception as e:
                    print(f"Failed to delete {filename}: {e}")

            else:
                # delete duplicate files
                if filename in ls_file:
                    file_path = os.path.join(data_dir,dir_name, filename)
                    try:
                        os.remove(file_path)
                        print(f"Deleted: {filename}")
                    except Exception as e:
                        print(f"Failed to delete {filename}: {e}")
                # move other files to the main directory
                else:
                    ls_file.append(filename)
                    ls_dir.append(dir_name)
                    source_path = os.path.join(data_dir, dir_name, filename)
                    destination_path = os.path.join(data_dir, filename)        
                    os.rename(source_path, destination_path)

        # Remove the empty directory
        os.rmdir(os.path.join(data_dir, dir_name))  
            
        print(f"Processing directory: {dir_name}")

Deleted: endpoint_pfile_20250505-20250511_fileheader.csv
Deleted: npidata_pfile_20250505-20250511_fileheader.csv
Deleted: othername_pfile_20250505-20250511_fileheader.csv
Deleted: pl_pfile_20250505-20250511_fileheader.csv
Processing directory: NPPES_Data_Dissemination_050525_051125_Weekly_V2
Deleted: endpoint_pfile_20250512-20250518_fileheader.csv
Deleted: npidata_pfile_20250512-20250518_fileheader.csv
Deleted: NPPES_Data_Dissemination_CodeValues.pdf
Deleted: NPPES_Data_Dissemination_Readme_v.2.pdf
Deleted: othername_pfile_20250512-20250518_fileheader.csv
Deleted: pl_pfile_20250512-20250518_fileheader.csv
Processing directory: NPPES_Data_Dissemination_051225_051825_Weekly_V2
Deleted: endpoint_pfile_20250519-20250525_fileheader.csv
Deleted: npidata_pfile_20250519-20250525_fileheader.csv
Deleted: NPPES_Data_Dissemination_CodeValues.pdf
Deleted: NPPES_Data_Dissemination_Readme_v.2.pdf
Deleted: othername_pfile_20250519-20250525_fileheader.csv
Deleted: pl_pfile_20250519-20250525_fileheader.

In [None]:
df_files = pd.DataFrame({"filename": ls_file, "dir": ls_dir})
df_files.to_csv("nppes_zip_files_v2.csv", index=False)

#### data integration

In [None]:
# Some files are too large, so we choose to convert them into pickle format first
for filename in os.listdir(data_dir):
    path = os.path.join(data_dir, filename)
    if filename.endswith(".csv"):
        df = pl.read_csv(path, streaming=True)
        df.to_pickle(path.replace(".csv", ".pkl.zip"), compression="zip")
        os.remove(path)
    elif filename.endswith(".xlsx") or filename.endswith(".xls"):
        df = pd.read_excel(path)
        df.to_pickle(path.replace(".xlsx", ".pkl.zip").replace(".xls", ".pkl.zip"), compression="zip")
        os.remove(path)
    
    gc.collect()

  df = pd.read_csv(os.path.join(data_dir, filename))


In [None]:
import csv
import pandas as pd
import os

input_file = "nppes_zip_files_v2/npidata_pfile_20050523-20250511.csv"
output_dir = "npidata_batches"
batch_size = 100_000

# 确保输出目录存在
os.makedirs(output_dir, exist_ok=True)

# 打开文件，逐行读取
with open(input_file, mode="r", encoding="utf-8", newline='') as f:
    reader = csv.reader(f)
    header = next(reader)  # 获取表头

    batch = []
    batch_num = 1
    for i, row in enumerate(reader, 1):
        batch.append(row)

        if i % batch_size == 0:
            df = pd.DataFrame(batch, columns=header)
            df.to_pickle(os.path.join(output_dir, f"batch_{batch_num}.pkl.zip"), compression="zip")
            gc.collect()
            print(f"✅ Saved batch {batch_num} with {len(batch)} rows")
            batch_num += 1
            batch.clear()

    # 保存最后不足一批的数据
    if batch:
        df = pd.DataFrame(batch, columns=header)
        df.to_pickle(os.path.join(output_dir, f"batch_{batch_num}.pkl.zip", compression="zip"))
        gc.collect()
        print(f"✅ Saved final batch {batch_num} with {len(batch)} rows")


: 

In [None]:
# 遍历每一个 batch（Polars DataFrame）
for i, batch in enumerate(df.iter_batches(batch_size=batch_size)):
    print(f"📦 保存第 {i+1} 批，行数：{len(batch)}")
    
    # 转换为 pandas
    pandas_batch = batch.to_pandas()

    # 保存为 Pickle 文件
    output_path = os.path.join(, f"batch_{i+1}.pkl")
    pandas_batch.to_pickle(output_path)

    # 手动清理内存
    del pandas_batch, batch
    print(f"✅ 已保存：{output_path}")

In [None]:
df_endpoint = pd.DataFrame()
df_npi = pd.DataFrame()
df_othername = pd.DataFrame()
df_pl = pd.DataFrame()

for filename in ls_file:
    if filename.endswith(".pkl.zip"):
        path = os.path.join(data_dir, filename)
        df = pd.read_pickle(path)

        print(f"Processing file: {filename}")

        if "endpoint" in filename:
            if df_endpoint.empty:
                df_endpoint = df
            else:
                df_endpoint = pd.concat([df_endpoint, df], ignore_index=True)
            df_endpoint.to_csv(os.path.join(data_dir, "nppes_endpoint.pkl.zip"), compression="zip")

        elif "npi" in filename:
            if df_npi.empty:
                df_npi = df
            else:
                df_npi = pd.concat([df_npi, df], ignore_index=True)
            df_npi.to_csv(os.path.join(data_dir, "nppes_npi.pkl.zip"), compression="zip")

        elif "othername" in filename:
            if df_othername.empty:
                df_othername = df
            else:
                df_othername = pd.concat([df_othername, df], ignore_index=True)
            df_othername.to_csv(os.path.join(data_dir, "nppes_othername.pkl.zip"), compression="zip") 

        elif filename.startswith("pl_"):
            if df_pl.empty:
                df_pl = df
            else:
                df_pl = pd.concat([df_pl, df], ignore_index=True)
            df_pl.to_csv(os.path.join(data_dir, "nppes_pl.pkl.zip"), compression="zip")

        os.remove(path)

print("Data processing complete. Files saved:")

Processing file: endpoint_pfile_20250505-20250511.csv


  df = pd.read_csv(path)


Processing file: npidata_pfile_20250505-20250511.csv
Processing file: othername_pfile_20250505-20250511.csv
Processing file: pl_pfile_20250505-20250511.csv
Processing file: endpoint_pfile_20250512-20250518.csv


  df = pd.read_csv(path)


Processing file: npidata_pfile_20250512-20250518.csv
Processing file: othername_pfile_20250512-20250518.csv
Processing file: pl_pfile_20250512-20250518.csv
Processing file: endpoint_pfile_20250519-20250525.csv


  df = pd.read_csv(path)


Processing file: npidata_pfile_20250519-20250525.csv
Processing file: othername_pfile_20250519-20250525.csv
Processing file: pl_pfile_20250519-20250525.csv


  df = pd.read_csv(path)


Processing file: endpoint_pfile_20050523-20250511.csv


In [8]:
pd.read_csv("nppes_zip_files_v2/endpoint_pfile_20050523-20250511.csv")

  pd.read_csv("nppes_zip_files_v2/endpoint_pfile_20050523-20250511.csv")


Unnamed: 0,NPI,Endpoint Type,Endpoint Type Description,Endpoint,Affiliation,Endpoint Description,Affiliation Legal Business Name,Use Code,Use Description,Other Use Description,Content Type,Content Description,Other Content Description,Affiliation Address Line One,Affiliation Address Line Two,Affiliation Address City,Affiliation Address State,Affiliation Address Country,Affiliation Address Postal Code
0,1154324382,DIRECT,Direct Messaging Address,rclose13800@MHSDIRECT.NET,N,,,,,,,,,3501 Johnson St,,Hollywood,FL,US,330215421.0
1,1154324382,DIRECT,Direct Messaging Address,Richard.Close@SEP.EClinicalDirectPlus.com,N,,,DIRECT,Direct,,,,,500 N Hiatus,Ste 200,Pembroke Pines,FL,US,33026.0
2,1154324366,DIRECT,Direct Messaging Address,rrodriguez1359559@direct.sw.org,N,,,,,,,,,1605 S 31st St,,Temple,TX,US,765089299.0
3,1962405175,DIRECT,Direct Messaging Address,fredericstelzer@epgi.allscriptsdirect.net,N,,,,,,,,,1501 N Cedar Crest Blvd,suite 110,Allentown,PA,US,181042309.0
4,1699778894,DIRECT,Direct Messaging Address,aawomolo@direct.iuhealth.org,N,,,,,,,,,2401 W University Ave,,Muncie,IN,US,47303.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
572798,1073303384,DIRECT,Direct Messaging Address,records@airrosti-rehab-ctrs.direct.kno2fy.com,Y,,"Airrosti Rehab Centers, LLC",,,,,,,111 Tower Dr Bldg 1,,San Antonio,TX,US,782323625
572799,1942090220,REST,RESTful URL,https://api.practicebetter.io,N,,,DIRECT,Direct,,CSV,CSV,,4237 Salisbury Rd Ste 204,,Jacksonville,FL,US,322160906
572800,1871383034,CONNECT,CONNECT URL,resolve,N,,,DIRECT,Direct,,,,,333 N Braddock Ave,,Pittsburgh,PA,US,152082512
572801,1821888983,CONNECT,CONNECT URL,securerecords.sanford.html,N,,,,,,,,,1075 E Betteravia Rd Ste 201,,Santa Maria,CA,US,934547023
