In [29]:
import dask.dataframe as dd
from dask.distributed import Client, LocalCluster
from ipaddress import IPv4Address, summarize_address_range
from netaddr import spanning_cidr, IPNetwork
import os

In [2]:
cluster = LocalCluster(n_workers=4, threads_per_worker=1, dashboard_address=':37937')  # Launches a scheduler and workers locally
client = Client(cluster) 
cluster.dashboard_link


'http://127.0.0.1:37937/status'

In [3]:
ddf = dd.read_csv("/datasets/ip-geolocation/ipinfo/paid/standard_location/2023-04-22.standard_location.csv.gz", compression='gzip')

Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  "Setting ``blocksize=None``" % compression


In [14]:
ddf.dtypes

start_ip        object
end_ip          object
join_key        object
city            object
region          object
country         object
latitude       float64
longitude      float64
postal_code     object
timezone        object
cidr            object
dtype: object

In [4]:
ddf = ddf.repartition(partition_size='10MB')

In [None]:
df =  ddf.repartition(partition_size='10MB').compute().head(20)

In [7]:
def get_spanning_prefix_for_range(start_ip, end_ip):
    #sip = IPv4Address(start_ip)
    #eip = IPv4Address(end_ip)
    return str(spanning_cidr([start_ip, end_ip]))

def get_net_summary(start_ip, end_ip):
    nets =  list(summarize_address_range(startip, endip))
    net_strs = [str(n) for n in nets]
    
def get_cidr(df):
    return df.apply(lambda x: get_spanning_prefix_for_range(x['start_ip'], x['end_ip']), axis=1)

def get_nets(df):
    return df.apply(lambda x: get_net_summary(x['start_ip'], x['end_ip']))

In [5]:
ddf['cidr'] = ddf.apply(lambda x: get_spanning_prefix_for_range(x['start_ip'], x['end_ip']), axis=1, meta=('start_ip', str))

In [None]:
df['cidr'] = df.apply(lambda x: get_spanning_prefix_for_range(x['start_ip'], x['end_ip']), axis=1)

In [21]:
ddf.to_parquet('/data1/manasvini/ipinfo/', overwrite=True, write_index=False, compression={"name": "gzip", "values": "lz4"} )

In [16]:
ddf['postal_code'] = ddf['postal_code'].astype(str).fillna('')

In [22]:
def get_dirs(dirname):
    dirs = [os.path.join(dirname, f) for f in os.listdir(dirname) if os.path.isdir(os.path.join(dirname, f))]
    return dirs
def get_files(dirname, pfx):
    files = [os.path.join(dirname, f) for f in os.listdir(dirname) if os.path.isfile(os.path.join(dirname, f)) and pfx in f]
    return files

In [89]:
ipinfo_ddf = dd.read_parquet(get_files('/data1/manasvini/ipinfo/', 'parquet'), blocksize='10MB' )

In [90]:
ipinfo_ddf.count().compute()


start_ip       75709833
end_ip         75709833
join_key       75709833
city           75709833
region         75666778
country        75706137
latitude       75709833
longitude      75709833
postal_code    75709833
timezone       75709833
cidr           75709833
dtype: int64

In [91]:
def get_24s_from_pfx(pfx):
    ip = IPNetwork(pfx)
    subnet_24s = list(ip.subnet(24))
    str_24s = []
    size = get_pfx_size(pfx)
    if size > 24:
        return [str(pfx).split('/')[0]]
    if size == 24:
        str_24s.append(str(pfx).split('/')[0])
        return str_24s
    for s in subnet_24s:
        str_24s.append(str(s).split('/')[0])
        #print(str(s))
    return str_24s

def get_pfx_size(pfx):
    return int(pfx.split('/')[1])

In [92]:
ipinfo_ddf['pfx_size'] = ipinfo_ddf.apply(lambda x: get_pfx_size(x['cidr']), axis=1, meta=('cidr', str))
ipinfo_ddf['/24_strs'] = ipinfo_ddf.apply(lambda x: get_24s_from_pfx(x['cidr']), axis=1, meta=('cidr', str))
ipinfo_ddf['pfx_size'] = ipinfo_ddf['pfx_size'].astype(int)

In [93]:
cidr_ddf = ipinfo_ddf[['cidr', '/24_strs']]
ipinfo_exploded_ddf = cidr_ddf.set_index('cidr').explode('/24_strs').reset_index()

In [94]:

ipinfo_exploded_ddf[ipinfo_exploded_ddf['cidr']=='106.193.100.0/23'].compute()

Unnamed: 0,cidr,/24_strs
865,106.193.100.0/23,106.193.100.0
866,106.193.100.0/23,106.193.101.0


In [96]:
os.makedirs('/data1/manasvini/ipinfo_expanded', exist_ok=True)
ipinfo_exploded_ddf.to_parquet('/data1/manasvini/ipinfo_expanded/', overwrite=True, write_index=False, compression={"name": "gzip", "values": "lz4"} )

In [3]:
a=[1]
'.'.join(a[:-1])


''