# Ingest from an Array and Fetch Data

In [None]:
import csv
import requests

from io import StringIO
from requests.adapters import HTTPAdapter

from urllib3.util.retry import Retry
from urllib.parse import urlencode

import pandas as pd

In [None]:
def ingest_from_array(rows,datasource, token, mode='append', endpoint='https://api.tinybird.co'):
  url = f'{endpoint}/v0/datasources?mode={mode}&name={datasource}'

  retry = Retry(total=5, backoff_factor=0.2)
  adapter = HTTPAdapter(max_retries=retry)
  _session = requests.Session()
  _session.mount('http://', adapter)
  _session.mount('https://', adapter)

  csv_chunk = StringIO()
  writer = csv.writer(csv_chunk, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC)

  max_wait_records = 5000
  max_wait_bytes = 32 * 1024 ** 2

  records = 0
  for row in rows:
    writer.writerow(row)
    records += 1

    if (records > max_wait_records and csv_chunk.tell() > max_wait_bytes) or len(rows) == records:
        data = csv_chunk.getvalue()
        headers = {
            'Authorization': f'Bearer {token}',
            'X-TB-Client': 'pltx-0.1',
        }

        ok = False
        try:
            response = _session.post(url, headers=headers, files=dict(csv=data))
            result = response.json()

            ok = response.status_code < 400
            if ok:
                csv_chunk = StringIO()
                writer = csv.writer(csv_chunk, delimiter=',', quotechar='"', quoting=csv.QUOTE_NONNUMERIC)
                print(f"Flushed {len(data)} bytes, datasource={datasource}, response={response.status_code}")
                print(f"Result id={result.get('import_id', None)}, error={result.get('error', False)}")
        except Exception as e:
            print(e)

  print('Done')

In [None]:
def fetch_table(token, table_name, endpoint):
      s = requests.Session()
      s.headers['Authorization'] = f'Bearer {token}'
      URL = f'{endpoint}/v0/sql'
      sql = f'select * from {table_name}'
      format = "JSON"
      params = {'q': sql + f" FORMAT {format}"}
      r = s.get(f"{URL}?{urlencode(params)}")
      if r.status_code == 200:
        if format == 'JSON':
          return r.json()
        else:
          return r.text
      raise Exception(f'failed to fetch {sql}: {r.text}')

#Create Data Source from Array

In [None]:
datasource = 'array_eg'
#token = '{TOKEN}'
endpoint = 'https://api.tinybird.co'

In [None]:
mode = 'create'
rows = [["a", "b", "c"], [1, 2, 3], [4, 5, 6]]

ingest_from_array(rows, datasource, token, mode, endpoint)

Flushed 27 bytes, datasource=array_eg, response=200
Result id=8296957c-4656-4d82-b19c-f51c52152c4a, error=False
Done


#Append to Data Source from Array

In [None]:
rows = [[7, 8, 9], [10, 11, 12]]
mode = 'append'

ingest_from_array(rows, datasource, token, mode, endpoint)

Flushed 17 bytes, datasource=array_eg, response=200
Result id=66c11d25-41e2-49cb-b036-aa7a53d1c60e, error=False
Done


#Fetch from Data Source to DataFrame

In [None]:
fetched = fetch_table(token, datasource, endpoint)
fetched

{'data': [{'a': 7, 'b': 8, 'c': 9},
  {'a': 4, 'b': 5, 'c': 6},
  {'a': 1, 'b': 2, 'c': 3},
  {'a': 10, 'b': 11, 'c': 12}],
 'meta': [{'name': 'a', 'type': 'Int16'},
  {'name': 'b', 'type': 'Int16'},
  {'name': 'c', 'type': 'Int16'}],
 'rows': 4,
 'statistics': {'bytes_read': 24, 'elapsed': 0.000678376, 'rows_read': 4}}

In [None]:
types=dict([(fetched['meta'][x]['name'],fetched['meta'][x]['type']) for x,_ in enumerate(fetched['meta'])])
types

{'a': 'Int16', 'b': 'Int16', 'c': 'Int16'}

In [None]:
df=pd.DataFrame.from_dict(fetched['data']).astype(types)
df

Unnamed: 0,a,b,c
0,7,8,9
1,4,5,6
2,1,2,3
3,10,11,12


In [None]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4 entries, 0 to 3
Data columns (total 3 columns):
 #   Column  Non-Null Count  Dtype
---  ------  --------------  -----
 0   a       4 non-null      Int16
 1   b       4 non-null      Int16
 2   c       4 non-null      Int16
dtypes: Int16(3)
memory usage: 164.0 bytes
