Преобразование данных .tsv.gz в parquet.gz

In [2]:
import pandas as pd
import random
import gzip

num_rows = 1000000

# Генерируем данные
data = {
    'ID': range(1, num_rows + 1),
    'Name': ['User' + str(i) for i in range(1, num_rows + 1)],
    'Age': [random.randint(18, 65) for _ in range(1, num_rows + 1)],
    'City': ['City' + str(random.randint(1, 10)) for _ in range(1, num_rows + 1)],
    'Country': ['Country' + str(random.randint(1, 5)) for _ in range(1, num_rows + 1)],
    'Gender': ['Male' if random.random() < 0.5 else 'Female' for _ in range(1, num_rows + 1)],
    'Salary': [random.randint(30000, 100000) for _ in range(1, num_rows + 1)],
    'Education': [random.choice(['Bachelor', 'Master', 'PhD']) for _ in range(1, num_rows + 1)],
    'Employment': [random.choice(['Full-time', 'Part-time', 'Self-employed', 'Unemployed']) for _ in range(1, num_rows + 1)],
    'Marital_Status': [random.choice(['Single', 'Married', 'Divorced', 'Widowed']) for _ in range(1, num_rows + 1)]
}

df = pd.DataFrame(data)

test_file_path = 'data\\table-test.tsv.gz'

with gzip.open(test_file_path, 'wt', encoding='utf-8') as f:
    df.to_csv(f, sep='\t', index=False)

print(f'Исходный файл с {num_rows} строками создан.')

Исходный файл с 1000000 строками создан.


In [12]:
import gzip
import pandas as pd

input_file = 'C:\\_pipeline\\beeline_2023-09_radio_by_s2.tsv.gz'
output_file = 'C:\\_pipeline\\beeline_2023-09_radio_by_s2_p17.csv'
# Размер блока данных (количество строк) для чтения
chunksize = 1000

with gzip.open(input_file, 'rt', encoding='utf-8') as file:
    for i, chunk in enumerate(pd.read_csv(file, sep='\t', chunksize=chunksize)):
        df_chunk = pd.DataFrame(chunk)
        df_filtred = df_chunk.loc[df_chunk['s2_level'] == 's2_p17']

        if len(df_filtred) > 0:
            break

df_filtred = df_filtred.drop(columns=['s2_level'])
df_filtred.to_csv(output_file, sep='\t', index=False)

In [8]:
import gzip
import pandas as pd

input_file = 'C:\\_pipeline\\beeline_2023-09_radio_by_s2.tsv.gz'
output_file = 'C:\\_pipeline\\beeline_2023-09_radio_by_s2.csv'

# Размер блока данных (количество строк) для чтения
chunksize = 1000

with gzip.open(input_file, 'rt', encoding='utf-8') as file:
        first_chunk = next(pd.read_csv(file, sep='\t', chunksize=chunksize))

first_chunk.to_csv(output_file, sep='\t', index=False)

print(first_chunk.head())

  s2_level             s2_value CellType   fielddate  radio_devices  \
0   s2_p13  4627401733260181504      LTE  2023-09-04            NaN   
1   s2_p13  4627401836339396608      LTE  2023-09-28            NaN   
2   s2_p13  4627402351735472128      LTE  2023-09-16            NaN   
3   s2_p13  4627402798412070912      LTE  2023-09-25            NaN   
4   s2_p13  4627403897923698688      LTE  2023-09-18            NaN   

   radio_devices_min  radio_devices_max  radio_devices_position  \
0               14.0               14.0                     NaN   
1               15.0               15.0                     NaN   
2               11.0               11.0                     NaN   
3               23.0               42.0                     NaN   
4               19.0               19.0                     NaN   

   radio_measurements  radio_measurements_min  ...  SignalStrength_p80_max  \
0                 NaN                   245.0  ...              -98.000000   
1             

In [8]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import gzip

input_file = 'C:\\_pipeline\\beeline_2023-09_radio_by_s2_month_binary.tsv.gz'
output_file = 'C:\\_pipeline\\beeline_2023-09_radio_by_s2h17_month_binary.parquet.gz'

# Определите размер блока (количество строк) для чтения
chunksize = 1000000

if os.path.exists(output_file):
    os.remove(output_file)

try:
    # Создайте схему данных из первой порции данных
    with gzip.open(input_file, 'rt', encoding='utf-8') as file:
        for i, chunk in enumerate(pd.read_csv(file, sep='\t', chunksize=chunksize)):
            df_chunk = pd.DataFrame(chunk)
            df_filtred = df_chunk.loc[df_chunk['s2_level'] == 's2_p17']

            if len(df_filtred) > 0:
                break

    df_filtred = df_filtred.drop(columns=['s2_level'])
    table = pa.Table.from_pandas(df_filtred)
    schema = table.schema
    pq.write_table(table, output_file, compression='gzip')

    rows_cnt = len(df_filtred)
    print(f'Записано строк данных: {rows_cnt}', end='\r')
        
    # Откройте gzip-файл и читайте данные по блокам
    with gzip.open(input_file, 'rt', encoding='utf-8') as file:
        with pq.ParquetWriter(output_file, schema, compression='gzip') as writer:
            for i, chunk in enumerate(pd.read_csv(file, sep='\t', chunksize=chunksize)):
                df_chunk = pd.DataFrame(chunk)
                df_filtred = df_chunk.loc[df_chunk['s2_level'] == 's2_p17']

                if len(df_filtred) > 0:
                    df_filtred = df_filtred.drop(columns=['s2_level'])
                    table = pa.Table.from_pandas(df_filtred, schema=schema)
                    writer.write_table(table)

                    rows_cnt = rows_cnt + len(df_filtred)
                    print(f'Записано строк данных: {rows_cnt}', end='\r')

except Exception as e:
    print(f'Произошла ошибка: {str(e)}')

finally:
    if 'table' in locals():
        table = None

print('\n Завершено')

Записано строк данных: 68685522
 Завершено


In [11]:
import pyarrow.parquet as pq

output_file = 'C:\\_pipeline\\beeline_2023-09_radio_by_s2h17_month_binary.parquet.gz'

table = pq.read_table(output_file)
# num_rows = 10  # Измените на нужное количество строк
# subset = table.slice(0, num_rows)
# df = subset.to_pandas()
# print(df.head())

df = table.to_pandas()

print(df.head())

num_records_in_parquet = len(df)
print(f'Число записей в Parquet-файле: {num_records_in_parquet}')

                    s2_value CellType  SignalStrength_p50  \
1126873  1390848764293414912      LTE              -105.0   
1126874  1391176878185775104      LTE                 NaN   
1126875  1392402686548115456      LTE                 NaN   
1126876  1393064275525763072      LTE                 NaN   
1126877  1393640029179346944      LTE               -93.0   

         SignalStrength_p50_other  
1126873                     False  
1126874                      True  
1126875                      True  
1126876                      True  
1126877                     False  
Число записей в Parquet-файле: 67812395


In [9]:
import os
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
import gzip

input_file = 'C:\\_pipeline\\beeline_2023-09_radio_by_s2.tsv.gz'
output_file = 'C:\\_pipeline\\beeline_2023-09_radio_by_s2.parquet.gz'

# Определите размер блока (количество строк) для чтения
chunksize = 100000

if os.path.exists(output_file):
    os.remove(output_file)

try:
    # Запись первого блока данных
    with gzip.open(input_file, 'rt', encoding='utf-8') as file:
        first_chunk = next(pd.read_csv(file, sep='\t', chunksize=chunksize))
        # schema = pa.schema([pa.field(col, pa.string()) for col in first_chunk.columns])
        table = pa.Table.from_pandas(first_chunk)
        schema = table.schema
        pq.write_table(table, output_file, compression='gzip')
        

    # Запись данных по блокам
    with gzip.open(input_file, 'rt', encoding='utf-8') as file:
        with pq.ParquetWriter(output_file, schema, compression='gzip') as writer:
            for i, chunk in enumerate(pd.read_csv(file, sep='\t', chunksize=chunksize)):
                df_chunk = pd.DataFrame(chunk)
                table = pa.Table.from_pandas(df_chunk, schema=schema)
                writer.write_table(table)

                print(f'Записано строк данных: {(i + 1) * chunksize}', end='\r')

except Exception as e:
    print(f'Произошла ошибка: {str(e)}')

finally:
    # Закройте выходной файл в любом случае, чтобы избежать утечек ресурсов
    if 'table' in locals():
        table = None

    print('Завершено')

# Проверка записанных данных
table = pq.read_table(output_file)

num_rows_to_read = 10  # Измените на нужное количество строк
subset = table.head(num_rows_to_read)

df_subset = subset.to_pandas()
print(df_subset)

# Проверка записанных данных
table = pq.read_table(output_file)
df = table.to_pandas()

print(df.head(num_rows_to_read))

num_records_in_parquet = len(df)
print(f'Записано строк: {num_records_in_parquet}')

TypeError: __cinit__() got an unexpected keyword argument 'append'