-
Notifications
You must be signed in to change notification settings - Fork 2
/
parquet_files.jl
75 lines (67 loc) · 2.64 KB
/
parquet_files.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
"""
$docstring_read_parquet
"""
function read_parquet(data_file;
col_select=nothing,
skip=0,
n_max=Inf,
col_names=true) # Handle column names display
# Determine if the file is a local file or a URL
if startswith(data_file, "http://") || startswith(data_file, "https://")
# Fetch the content from the URL
response = HTTP.get(data_file)
# Ensure the request was successful
if response.status != 200
error("Failed to fetch the Parquet file: HTTP status code ", response.status)
end
# Use the content fetched from the URL as an IOBuffer for reading
file_to_read = IOBuffer(response.body)
else
# Use the local file path
file_to_read = data_file
end
# Open the dataset
ds = Parquet2.Dataset(file_to_read)
df = DataFrame(ds; copycols=false) # Load the entire dataset initially
# Apply column selection if provided
if !isnothing(col_select)
# Ensure column names are in the correct format
col_select = [typeof(c) === Symbol ? string(c) : c for c in col_select]
df = select(df, col_select)
end
# Apply skip and limit
if skip > 0 || !isinf(n_max)
start_idx = max(1, skip + 1)
end_idx = !isinf(n_max) ? start_idx + n_max - 1 : nrow(df)
df = df[start_idx:min(end_idx, nrow(df)), :]
end
# If column names should not be displayed as headers
if !col_names
# Create a DataFrame with the original column names as the first row
col_names_df = DataFrame([transpose(names(df))], [:ColumnNames])
# Concatenate the DataFrame with column names as the first row
df = vcat(col_names_df, df)
# Rename columns to generic names
rename!(df, Symbol.(:Column, 1:ncol(df)))
end
return df
end
"""
$docstring_write_parquet
"""
function write_parquet(data, filename::String; buffer::Union{IO, Nothing}=nothing,
npages::Union{Int, Dict}=1,
compression_codec::Union{Symbol, Dict}=Dict(),
column_metadata::Union{Dict, Pair}=Dict(),
metadata::Dict=Dict())
# Choose the appropriate method to write data based on `buffer` presence
if isnothing(buffer)
# Write directly to file with options
Parquet2.writefile(filename, data; npages=npages, compression_codec=compression_codec,
column_metadata=column_metadata, metadata=metadata)
else
# Write to the provided buffer
Parquet2.writefile(buffer, data; npages=npages, compression_codec=compression_codec,
column_metadata=column_metadata, metadata=metadata)
end
end