In [1]:
#| default_exp retrieval

In [2]:
#| export
from istatapi.discovery import DataSet
from istatapi.base import ISTAT
import pandas as pd
import io

In [3]:
#| hide
from nbdev.showdoc import *
from fastcore.test import *
from nbdev import *

# Retrieval

> Functions to retrieve data from the ISTAT api


In [10]:
#| export
RESOURCE = "data"
# TODO: accept json response as well (?)


def get_data(dataset: DataSet, **kwargs):
    "returns a dataframe of the filtered 'dataset'"
    flowRef = dataset.identifiers["df_id"]
    filters = dataset.filters
    key = make_url_key(filters)
    path_parts = [RESOURCE, flowRef, key]
    path = "/".join(path_parts)
    request = ISTAT()
    response = request._request(path, headers={"Accept": "text/csv"})
    df = pd.read_csv(io.StringIO(response.text))

    if "TIME_PERIOD" in df.columns:
        try:
            df["TIME_PERIOD"] = pd.to_datetime(
                df["TIME_PERIOD"].astype(str), 
            )
        except ValueError:
            df["TIME_PERIOD"] = pd.to_datetime(
                df["TIME_PERIOD"].astype(str), 
                format="mixed"
            )
        df = df.sort_values(by=["TIME_PERIOD"])

    return df


def make_url_key(filters: dict):
    key = ""

    for i, filter_tuple in enumerate(filters.items()):

        filter = filter_tuple[0]
        filter_value = filter_tuple[1]

        # add a + and convert to str
        if type(filter_value) == list:
            filter_value = "+".join(filter_value)

        if i != 0:
            if list(filters.values())[i - 1] != ".":
                filter_value = "." + filter_value

        key += filter_value

    return key

In [5]:
show_doc(get_data)

---

[source](https://github.com/Attol8/istatapi/blob/master/istatapi/retrieval.py#L17){target="_blank" style="float:right; font-size:smaller"}

### get_data

>      get_data (dataset:istatapi.discovery.DataSet, **kwargs)

*returns a dataframe of the filtered 'dataset'*

In [6]:
#initiate the dataset
dataset = DataSet(dataflow_identifier="139_176")

#define some filters
freq = "M"
tipo_dato=["ISAV", "ESAV"]
paese_partner="WORLD"

#set filters
dataset.set_filters(freq=freq, tipo_dato=tipo_dato, paese_partner=paese_partner)
#return dataset
trade_df = get_data(dataset)
trade_df.head()
test_eq(trade_df['FREQ'].unique(), ['M'])
test_eq(trade_df['TIPO_DATO'].unique().sort(), ["ISAV", "ESAV"].sort())
test_eq(trade_df['PAESE_PARTNER'].unique().sort(), ["WORLD"].sort())
test_eq(type(trade_df['TIME_PERIOD'].iloc[0]), pd._libs.tslibs.timestamps.Timestamp)

In [7]:
# test annual dataset
dataset = DataSet(dataflow_identifier="144_125")

#define some filters
freq = "A"

#set filters
dataset.set_filters(freq=freq)
#return dataset
annual_df = get_data(dataset)

test_eq(annual_df['FREQ'].unique(), ['A'])
test_eq(type(annual_df['TIME_PERIOD'].iloc[0]), pd._libs.tslibs.timestamps.Timestamp)

In [8]:
# test another dataset
ds = DataSet(dataflow_identifier="22_315")
freq = 'A'
sesso = '9'
itter107 = ['IT', 'ITG', 'ITF']

ds.set_filters(freq = freq, itter107 = itter107, sesso = sesso)
df = get_data(ds)

test_eq(df['FREQ'].unique(), ['A'])
test_eq(df['ITTER107'].unique().sort(), ['IT', 'ITF', 'ITG'].sort())
test_eq(df['SESSO'].unique(), [9])
test_eq(type(df['TIME_PERIOD'].iloc[0]), pd._libs.tslibs.timestamps.Timestamp)

In [12]:
# test dataset from https://github.com/Attol8/istatapi/issues/24

ds = DataSet(dataflow_identifier="151_914")
df = get_data(ds)

# test that df is not empty
test_eq(df.empty, False)

ConnectionError: HTTPConnectionPool(host='sdmx.istat.it', port=80): Max retries exceeded with url: /SDMXWS/rest/datastructure/IT1/DCCV_TAXDISOCCU1 (Caused by NewConnectionError('<urllib3.connection.HTTPConnection object at 0x1275fb0d0>: Failed to establish a new connection: [Errno 12] Cannot allocate memory'))

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()