-
Notifications
You must be signed in to change notification settings - Fork 62
[BUG] Fixed problems with generation parquet files #93
Changes from all commits
dcd4735
3fdd501
04a1717
cb3fa59
7962aca
3451e01
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,102 +5,129 @@ | |
import pandas as pd | ||
|
||
|
||
class ParquetGenerator: | ||
GEN_KDE_PQ_CALLED = False | ||
GEN_PQ_TEST_CALLED = False | ||
|
||
@classmethod | ||
def gen_kde_pq(cls, file_name='kde.parquet', N=101): | ||
if not cls.GEN_KDE_PQ_CALLED: | ||
df = pd.DataFrame({'points': np.random.random(N)}) | ||
table = pa.Table.from_pandas(df) | ||
row_group_size = 128 | ||
pq.write_table(table, file_name, row_group_size) | ||
cls.GEN_KDE_PQ_CALLED = True | ||
|
||
@classmethod | ||
def gen_pq_test(cls): | ||
if not cls.GEN_PQ_TEST_CALLED: | ||
df = pd.DataFrame( | ||
{ | ||
'one': [-1, np.nan, 2.5, 3., 4., 6., 10.0], | ||
'two': ['foo', 'bar', 'baz', 'foo', 'bar', 'baz', 'foo'], | ||
'three': [True, False, True, True, True, False, False], | ||
# float without NA | ||
'four': [-1, 5.1, 2.5, 3., 4., 6., 11.0], | ||
# str with NA | ||
'five': ['foo', 'bar', 'baz', None, 'bar', 'baz', 'foo'], | ||
} | ||
) | ||
table = pa.Table.from_pandas(df) | ||
pq.write_table(table, 'example.parquet') | ||
pq.write_table(table, 'example2.parquet', row_group_size=2) | ||
cls.GEN_PQ_TEST_CALLED = True | ||
|
||
|
||
def generate_spark_data(): | ||
from pyspark.sql import SparkSession | ||
from pyspark.sql.types import ( | ||
StructType, StructField, DateType, TimestampType) | ||
|
||
# test datetime64, spark dates | ||
dt1 = pd.DatetimeIndex(['2017-03-03 03:23', | ||
'1990-10-23', '1993-07-02 10:33:01']) | ||
df = pd.DataFrame({'DT64': dt1, 'DATE': dt1.copy()}) | ||
df.to_parquet('pandas_dt.pq') | ||
|
||
spark = SparkSession.builder.appName("GenSparkData").getOrCreate() | ||
schema = StructType([StructField('DT64', DateType(), True), | ||
StructField('DATE', TimestampType(), True)]) | ||
sdf = spark.createDataFrame(df, schema) | ||
sdf.write.parquet('sdf_dt.pq', 'overwrite') | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is it possible to move this directory (and other generated data) under "build" directories tree? I don't think we need it on top sources level. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The location of the test folder is quite good, as far as I can see, it is used in many large projects (for example, pandas). The generated data is really out of place. I suggest leaving the test folder in place and transferring the generated data into it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am ready to do it, but probably better in separate PR There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shssf This PR can be merged? There is no blocker? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @anmyachev yes, but lets wait @fschlimb approval. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @shssf ok, thanks There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, this is standard practice to have test folders to each source folder. |
||
|
||
spark.stop() | ||
|
||
|
||
def gen_lr(file_name, N, D): | ||
points = np.random.random((N,D)) | ||
points = np.random.random((N, D)) | ||
anmyachev marked this conversation as resolved.
Show resolved
Hide resolved
|
||
responses = np.random.random(N) | ||
f = h5py.File(file_name, "w") | ||
dset1 = f.create_dataset("points", (N,D), dtype='f8') | ||
dset1 = f.create_dataset("points", (N, D), dtype='f8') | ||
dset1[:] = points | ||
dset2 = f.create_dataset("responses", (N,), dtype='f8') | ||
dset2[:] = responses | ||
f.close() | ||
|
||
def gen_kde_pq(file_name, N): | ||
df = pd.DataFrame({'points': np.random.random(N)}) | ||
table = pa.Table.from_pandas(df) | ||
row_group_size = 128 | ||
pq.write_table(table, file_name, row_group_size) | ||
|
||
def gen_pq_test(file_name): | ||
df = pd.DataFrame({'one': [-1, np.nan, 2.5, 3., 4., 6., 10.0], | ||
'two': ['foo', 'bar', 'baz', 'foo', 'bar', 'baz', 'foo'], | ||
'three': [True, False, True, True, True, False, False], | ||
'four': [-1, 5.1, 2.5, 3., 4., 6., 11.0], # float without NA | ||
'five': ['foo', 'bar', 'baz', None, 'bar', 'baz', 'foo'], # str with NA | ||
}) | ||
table = pa.Table.from_pandas(df) | ||
pq.write_table(table, 'example.parquet') | ||
pq.write_table(table, 'example2.parquet', row_group_size=2) | ||
|
||
N = 101 | ||
D = 10 | ||
gen_lr("lr.hdf5", N, D) | ||
|
||
arr = np.arange(N) | ||
f = h5py.File("test_group_read.hdf5", "w") | ||
g1 = f.create_group("G") | ||
dset1 = g1.create_dataset("data", (N,), dtype='i8') | ||
dset1[:] = arr | ||
f.close() | ||
|
||
gen_kde_pq('kde.parquet', N) | ||
gen_pq_test('example.parquet') | ||
|
||
df = pd.DataFrame({'A': ['bc']+["a"]*3+ ["bc"]*3+['a'], 'B': [-8,1,2,3,1,5,6,7]}) | ||
df.to_parquet("groupby3.pq") | ||
|
||
df = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo", | ||
"bar", "bar", "bar", "bar"], | ||
"B": ["one", "one", "one", "two", "two", | ||
"one", "one", "two", "two"], | ||
"C": ["small", "large", "large", "small", | ||
"small", "large", "small", "small", | ||
"large"], | ||
"D": [1, 2, 2, 6, 3, 4, 5, 6, 9]}) | ||
df.to_parquet("pivot2.pq") | ||
|
||
# test datetime64, spark dates | ||
dt1 = pd.DatetimeIndex(['2017-03-03 03:23', '1990-10-23', '1993-07-02 10:33:01']) | ||
df = pd.DataFrame({'DT64': dt1, 'DATE': dt1.copy()}) | ||
df.to_parquet('pandas_dt.pq') | ||
|
||
from pyspark.sql import SparkSession | ||
from pyspark.sql.types import StructType, StructField, DateType, TimestampType | ||
|
||
spark = SparkSession.builder.appName("GenSparkData").getOrCreate() | ||
schema = StructType([StructField('DT64', DateType(), True), StructField('DATE', TimestampType(), True)]) | ||
sdf = spark.createDataFrame(df, schema) | ||
sdf.write.parquet('sdf_dt.pq', 'overwrite') | ||
|
||
spark.stop() | ||
|
||
# CSV reader test | ||
data = ("0,2.3,4.6,47736\n" | ||
"1,2.3,4.6,47736\n" | ||
"2,2.3,4.6,47736\n" | ||
"4,2.3,4.6,47736\n") | ||
|
||
with open("csv_data1.csv", "w") as f: | ||
f.write(data) | ||
|
||
|
||
with open("csv_data_infer1.csv", "w") as f: | ||
f.write('A,B,C,D\n'+data) | ||
|
||
data = ("0,2.3,2015-01-03,47736\n" | ||
"1,2.3,1966-11-13,47736\n" | ||
"2,2.3,1998-05-21,47736\n" | ||
"4,2.3,2018-07-11,47736\n") | ||
|
||
with open("csv_data_date1.csv", "w") as f: | ||
f.write(data) | ||
|
||
# generated data for parallel merge_asof testing | ||
df1 = pd.DataFrame({'time': pd.DatetimeIndex( | ||
['2017-01-03', '2017-01-06', '2017-02-15', '2017-02-21']), | ||
'B': [4, 5, 9, 6]}) | ||
df2 = pd.DataFrame({'time': pd.DatetimeIndex( | ||
['2017-01-01', '2017-01-14', '2017-01-16', '2017-02-23', '2017-02-23', | ||
'2017-02-25']), 'A': [2,3,7,8,9,10]}) | ||
df1.to_parquet("asof1.pq") | ||
df2.to_parquet("asof2.pq") | ||
|
||
def generate_other_data(): | ||
N = 101 | ||
D = 10 | ||
gen_lr("lr.hdf5", N, D) | ||
|
||
arr = np.arange(N) | ||
f = h5py.File("test_group_read.hdf5", "w") | ||
g1 = f.create_group("G") | ||
dset1 = g1.create_dataset("data", (N,), dtype='i8') | ||
dset1[:] = arr | ||
f.close() | ||
|
||
df = pd.DataFrame({'A': ['bc']+["a"]*3+ ["bc"]*3+['a'], 'B': [-8,1,2,3,1,5,6,7]}) | ||
df.to_parquet("groupby3.pq") | ||
|
||
df = pd.DataFrame({"A": ["foo", "foo", "foo", "foo", "foo", | ||
"bar", "bar", "bar", "bar"], | ||
"B": ["one", "one", "one", "two", "two", | ||
"one", "one", "two", "two"], | ||
"C": ["small", "large", "large", "small", | ||
"small", "large", "small", "small", | ||
"large"], | ||
"D": [1, 2, 2, 6, 3, 4, 5, 6, 9]}) | ||
df.to_parquet("pivot2.pq") | ||
|
||
# CSV reader test | ||
data = ("0,2.3,4.6,47736\n" | ||
"1,2.3,4.6,47736\n" | ||
"2,2.3,4.6,47736\n" | ||
"4,2.3,4.6,47736\n") | ||
|
||
with open("csv_data1.csv", "w") as f: | ||
f.write(data) | ||
|
||
with open("csv_data_infer1.csv", "w") as f: | ||
f.write('A,B,C,D\n'+data) | ||
|
||
data = ("0,2.3,2015-01-03,47736\n" | ||
"1,2.3,1966-11-13,47736\n" | ||
"2,2.3,1998-05-21,47736\n" | ||
"4,2.3,2018-07-11,47736\n") | ||
|
||
with open("csv_data_date1.csv", "w") as f: | ||
f.write(data) | ||
|
||
# generated data for parallel merge_asof testing | ||
df1 = pd.DataFrame({'time': pd.DatetimeIndex( | ||
['2017-01-03', '2017-01-06', '2017-02-15', '2017-02-21']), | ||
'B': [4, 5, 9, 6]}) | ||
df2 = pd.DataFrame({'time': pd.DatetimeIndex( | ||
['2017-01-01', '2017-01-14', '2017-01-16', '2017-02-23', '2017-02-23', | ||
'2017-02-25']), 'A': [2,3,7,8,9,10]}) | ||
df1.to_parquet("asof1.pq") | ||
df2.to_parquet("asof2.pq") | ||
|
||
|
||
if __name__ == "__main__": | ||
print('generation phase') | ||
ParquetGenerator.gen_kde_pq() | ||
ParquetGenerator.gen_pq_test() | ||
generate_spark_data() | ||
generate_other_data() |
Uh oh!
There was an error while loading. Please reload this page.