Skip to content

Commit 6dcbac9

Browse files
committed
feat: Speed up field lookups in DFSchema
Signed-off-by: Alex Qyoun-ae <4062971+MazterQyou@users.noreply.github.com>
1 parent cf034ba commit 6dcbac9

File tree

1 file changed

+108
-51
lines changed

1 file changed

+108
-51
lines changed

datafusion/common/src/dfschema.rs

Lines changed: 108 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,20 @@ use crate::Column;
2727

2828
use arrow::compute::can_cast_types;
2929
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
30-
use std::fmt::{Display, Formatter};
30+
use std::fmt::{Debug, Display, Formatter};
3131

3232
/// A reference-counted reference to a `DFSchema`.
3333
pub type DFSchemaRef = Arc<DFSchema>;
3434

3535
/// DFSchema wraps an Arrow schema and adds relation names
36-
#[derive(Debug, Clone, PartialEq, Eq)]
36+
#[derive(Clone, PartialEq, Eq)]
3737
pub struct DFSchema {
3838
/// Fields
3939
fields: Vec<DFField>,
4040
/// Additional metadata in form of key value pairs
4141
metadata: HashMap<String, String>,
42+
/// Field-to-index hash map for fast lookup
43+
field_to_index: HashMap<LookupField, LookupResult>,
4244
}
4345

4446
impl DFSchema {
@@ -47,6 +49,7 @@ impl DFSchema {
4749
Self {
4850
fields: vec![],
4951
metadata: HashMap::new(),
52+
field_to_index: HashMap::new(),
5053
}
5154
}
5255

@@ -101,7 +104,38 @@ impl DFSchema {
101104
)));
102105
}
103106
}
104-
Ok(Self { fields, metadata })
107+
108+
let field_to_index = Self::create_field_to_index_map(&fields);
109+
Ok(Self {
110+
fields,
111+
metadata,
112+
field_to_index,
113+
})
114+
}
115+
116+
fn create_field_to_index_map(
117+
fields: &[DFField],
118+
) -> HashMap<LookupField, LookupResult> {
119+
let mut field_to_index = HashMap::new();
120+
for (index, field) in fields.iter().enumerate() {
121+
let lookup_field = LookupField::new(field.qualifier.as_deref(), field.name());
122+
field_to_index.insert(lookup_field, LookupResult::Exact(index));
123+
if field.qualifier.is_some() {
124+
let unqualified_lookup_field = LookupField::new(None, field.name());
125+
match field_to_index.get(&unqualified_lookup_field) {
126+
None => {
127+
field_to_index
128+
.insert(unqualified_lookup_field, LookupResult::Exact(index));
129+
}
130+
Some(LookupResult::Exact(_)) => {
131+
field_to_index
132+
.insert(unqualified_lookup_field, LookupResult::Ambiguous);
133+
}
134+
Some(LookupResult::Ambiguous) => { /* already ambiguous */ }
135+
}
136+
}
137+
}
138+
field_to_index
105139
}
106140

107141
/// Create a `DFSchema` from an Arrow schema
@@ -138,7 +172,8 @@ impl DFSchema {
138172
self.fields.push(field.clone());
139173
}
140174
}
141-
self.metadata.extend(other_schema.metadata.clone())
175+
self.metadata.extend(other_schema.metadata.clone());
176+
self.field_to_index = Self::create_field_to_index_map(&self.fields);
142177
}
143178

144179
/// Get a list of fields
@@ -171,42 +206,20 @@ impl DFSchema {
171206
qualifier: Option<&str>,
172207
name: &str,
173208
) -> Result<usize> {
174-
let mut matches = self
175-
.fields
176-
.iter()
177-
.enumerate()
178-
.filter(|(_, field)| match (qualifier, &field.qualifier) {
179-
// field to lookup is qualified.
180-
// current field is qualified and not shared between relations, compare both
181-
// qualifier and name.
182-
(Some(q), Some(field_q)) => {
183-
q.to_ascii_lowercase() == field_q.to_ascii_lowercase()
184-
&& field.name().to_ascii_lowercase() == name.to_ascii_lowercase()
185-
}
186-
// field to lookup is qualified but current field is unqualified.
187-
(Some(_), None) => false,
188-
// field to lookup is unqualified, no need to compare qualifier
189-
(None, Some(_)) | (None, None) => {
190-
field.name().to_ascii_lowercase() == name.to_ascii_lowercase()
191-
}
192-
})
193-
.map(|(idx, _)| idx);
194-
match matches.next() {
209+
let lookup_field = LookupField::new(qualifier, name);
210+
match self.field_to_index.get(&lookup_field) {
211+
Some(LookupResult::Exact(idx)) => Ok(*idx),
212+
Some(LookupResult::Ambiguous) => Err(DataFusionError::Plan(format!(
213+
"Ambiguous reference to field named '{}.{}'",
214+
qualifier.unwrap_or("<unqualified>"),
215+
name
216+
))),
195217
None => Err(DataFusionError::Plan(format!(
196218
"No field named '{}.{}'. Valid fields are {}.",
197219
qualifier.unwrap_or("<unqualified>"),
198220
name,
199221
self.get_field_names()
200222
))),
201-
Some(idx) => match matches.next() {
202-
None => Ok(idx),
203-
// found more than one matches
204-
Some(_) => Err(DataFusionError::Internal(format!(
205-
"Ambiguous reference to qualified field named '{}.{}'",
206-
qualifier.unwrap_or("<unqualified>"),
207-
name
208-
))),
209-
},
210223
}
211224
}
212225

@@ -315,31 +328,37 @@ impl DFSchema {
315328

316329
/// Strip all field qualifier in schema
317330
pub fn strip_qualifiers(self) -> Self {
331+
let fields = self
332+
.fields
333+
.into_iter()
334+
.map(|f| f.strip_qualifier())
335+
.collect::<Vec<_>>();
336+
let field_to_index = Self::create_field_to_index_map(&fields);
318337
DFSchema {
319-
fields: self
320-
.fields
321-
.into_iter()
322-
.map(|f| f.strip_qualifier())
323-
.collect(),
338+
fields,
339+
field_to_index,
324340
..self
325341
}
326342
}
327343

328344
/// Replace all field qualifier with new value in schema
329345
pub fn replace_qualifier(self, qualifier: &str) -> Self {
346+
let fields = self
347+
.fields
348+
.into_iter()
349+
.map(|f| {
350+
DFField::new(
351+
Some(qualifier),
352+
f.name(),
353+
f.data_type().to_owned(),
354+
f.is_nullable(),
355+
)
356+
})
357+
.collect::<Vec<_>>();
358+
let field_to_index = Self::create_field_to_index_map(&fields);
330359
DFSchema {
331-
fields: self
332-
.fields
333-
.into_iter()
334-
.map(|f| {
335-
DFField::new(
336-
Some(qualifier),
337-
f.name(),
338-
f.data_type().to_owned(),
339-
f.is_nullable(),
340-
)
341-
})
342-
.collect(),
360+
fields,
361+
field_to_index,
343362
..self
344363
}
345364
}
@@ -362,6 +381,23 @@ impl DFSchema {
362381
}
363382
}
364383

384+
impl Debug for DFSchema {
385+
fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
386+
#[derive(Debug)]
387+
#[allow(dead_code)]
388+
struct DFSchema<'a> {
389+
fields: &'a Vec<DFField>,
390+
metadata: &'a HashMap<String, String>,
391+
}
392+
393+
let debug = DFSchema {
394+
fields: &self.fields,
395+
metadata: &self.metadata,
396+
};
397+
debug.fmt(f)
398+
}
399+
}
400+
365401
impl From<DFSchema> for Schema {
366402
/// Convert DFSchema into a Schema
367403
fn from(df_schema: DFSchema) -> Self {
@@ -603,6 +639,27 @@ impl DFField {
603639
}
604640
}
605641

642+
#[derive(Clone, PartialEq, Eq, Hash)]
643+
struct LookupField {
644+
qualifier: Option<String>,
645+
name: String,
646+
}
647+
648+
impl LookupField {
649+
fn new(qualifier: Option<&str>, name: &str) -> Self {
650+
Self {
651+
qualifier: qualifier.map(|s| s.to_ascii_lowercase()),
652+
name: name.to_ascii_lowercase(),
653+
}
654+
}
655+
}
656+
657+
#[derive(Clone, Copy, PartialEq, Eq)]
658+
enum LookupResult {
659+
Exact(usize),
660+
Ambiguous,
661+
}
662+
606663
#[cfg(test)]
607664
mod tests {
608665
use super::*;

0 commit comments

Comments
 (0)